Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jul 10, 2024
1 parent ea10ee3 commit 6cf5aad
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 9 deletions.
15 changes: 9 additions & 6 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,6 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {

auto it = ChangesQueue.find(order);
if (it == ChangesQueue.end()) {
Y_VERIFY_DEBUG_S(false, "Trying to remove non-enqueud record: " << order);
return;
}

Expand Down Expand Up @@ -1111,7 +1110,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
CheckChangesQueueNoOverflow();
}

void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie) {
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie, bool afterMove) {
if (!records) {
return;
}
Expand All @@ -1131,10 +1130,13 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
const auto now = AppData()->TimeProvider->Now();
TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size()));
for (const auto& record : records) {
forward.emplace_back(record.Order, record.PathId, record.BodySize);

auto it = ChangesQueue.find(record.Order);
Y_ABORT_UNLESS(it != ChangesQueue.end());
if (it == ChangesQueue.end()) {
Y_ABORT_UNLESS(afterMove);
continue;
}

forward.emplace_back(record.Order, record.PathId, record.BodySize);

it->second.EnqueuedAt = now;
it->second.ReservationCookie = cookie;
Expand All @@ -1143,8 +1145,9 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
ChangesQueueBytes += record.BodySize;
}

if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
Y_ABORT_UNLESS(!afterMove);
ChangeQueueReservedCapacity -= it->second;
ChangeQueueReservedCapacity += records.size();
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1912,7 +1912,8 @@ class TDataShard
void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId);
void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId);
void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order);
void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie = 0);
// TODO(ilnaz): remove 'afterMove' after #6541
void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie = 0, bool afterMove = false);
ui32 GetFreeChangeQueueCapacity(ui64 cookie);
ui64 ReserveChangeQueueCapacity(ui32 capacity);
void UpdateChangeExchangeLag(TInstant now);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/move_index_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class TMoveIndexUnit : public TExecutionUnit {
void Complete(TOperation::TPtr, const TActorContext& ctx) override {
DataShard.CreateChangeSender(ctx);
DataShard.MaybeActivateChangeSender(ctx);
DataShard.EnqueueChangeRecords(std::move(ChangeRecords));
DataShard.EnqueueChangeRecords(std::move(ChangeRecords), 0, true);
}
};

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/move_table_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class TMoveTableUnit : public TExecutionUnit {
void Complete(TOperation::TPtr, const TActorContext& ctx) override {
DataShard.CreateChangeSender(ctx);
DataShard.MaybeActivateChangeSender(ctx);
DataShard.EnqueueChangeRecords(std::move(ChangeRecords));
DataShard.EnqueueChangeRecords(std::move(ChangeRecords), 0, true);
}
};

Expand Down

0 comments on commit 6cf5aad

Please sign in to comment.