Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add records to change queue at Execute() stage #6497

Merged
merged 8 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/counters_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -490,4 +490,5 @@ enum ETxTypes {
TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}];
TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}];
TXTYPE_WRITE = 82 [(TxTypeOpts) = {Name: "TxWrite"}];
TXTYPE_REMOVE_SCHEMA_SNAPSHOTS = 83 [(TxTypeOpts) = {Name: "TxRemoveSchemaSnapshots"}];
}
187 changes: 150 additions & 37 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,39 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()),
NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody()),
NIceDb::TUpdate<Schema::ChangeRecordDetails::Source>(record.GetSource()));

auto res = ChangesQueue.emplace(record.GetOrder(), record);
Y_VERIFY_S(res.second, "Duplicate change record: " << record.GetOrder());

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}

if (CommittingChangeRecords.empty()) {
db.GetDatabase().OnCommit([this] {
CommittingChangeRecords.clear();
});
db.GetDatabase().OnRollback([this] {
for (const auto order : CommittingChangeRecords) {
auto cIt = ChangesQueue.find(order);
Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order);

if (cIt->second.SchemaSnapshotAcquired) {
const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion);
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
ScheduleRemoveSchemaSnapshot(snapshotKey);
}
}

ChangesQueue.erase(cIt);
}

CommittingChangeRecords.clear();
});
}

CommittingChangeRecords.push_back(record.GetOrder());
} else {
auto& state = LockChangeRecords[lockId];
Y_ABORT_UNLESS(state.Changes.empty() || state.Changes.back().LockOffset < record.GetLockOffset(),
Expand Down Expand Up @@ -934,6 +967,14 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
committed.Step = rowVersion.Step;
committed.TxId = rowVersion.TxId;
collected.push_back(committed);

auto res = ChangesQueue.emplace(committed.Order, committed);
Y_VERIFY_S(res.second, "Duplicate change record: " << committed.Order);

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}
}

Y_VERIFY_S(!CommittedLockChangeRecords.contains(lockId), "Cannot commit lock " << lockId << " more than once");
Expand All @@ -960,7 +1001,26 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
LockChangeRecords.erase(it);
});
db.GetDatabase().OnRollback([this, lockId]() {
CommittedLockChangeRecords.erase(lockId);
auto it = CommittedLockChangeRecords.find(lockId);
Y_VERIFY_S(it != CommittedLockChangeRecords.end(), "Unexpected failure to find lockId# " << lockId);

for (size_t i = 0; i < it->second.Count; ++i) {
const ui64 order = it->second.Order + i;

auto cIt = ChangesQueue.find(order);
Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order);

if (cIt->second.SchemaSnapshotAcquired) {
const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion);
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
ScheduleRemoveSchemaSnapshot(snapshotKey);
}
}

ChangesQueue.erase(cIt);
}

CommittedLockChangeRecords.erase(it);
});
}

Expand Down Expand Up @@ -994,7 +1054,6 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {

auto it = ChangesQueue.find(order);
if (it == ChangesQueue.end()) {
Y_VERIFY_DEBUG_S(false, "Trying to remove non-enqueud record: " << order);
return;
}

Expand Down Expand Up @@ -1022,23 +1081,9 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
ChangesQueueBytes -= record.BodySize;

if (record.SchemaSnapshotAcquired) {
Y_ABORT_UNLESS(record.TableId);
auto tableIt = TableInfos.find(record.TableId.LocalPathId);

if (tableIt != TableInfos.end()) {
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey);

if (last) {
const auto* snapshot = SchemaSnapshotManager.FindSnapshot(snapshotKey);
Y_ABORT_UNLESS(snapshot);

if (snapshot->Schema->GetTableSchemaVersion() < tableIt->second->GetTableSchemaVersion()) {
SchemaSnapshotManager.RemoveShapshot(db, snapshotKey);
}
}
} else {
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
if (const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
ScheduleRemoveSchemaSnapshot(snapshotKey);
}
}

Expand All @@ -1059,7 +1104,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
CheckChangesQueueNoOverflow();
}

void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie) {
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie, bool afterMove) {
if (!records) {
return;
}
Expand All @@ -1079,27 +1124,24 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
const auto now = AppData()->TimeProvider->Now();
TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size()));
for (const auto& record : records) {
forward.emplace_back(record.Order, record.PathId, record.BodySize);
auto it = ChangesQueue.find(record.Order);
if (it == ChangesQueue.end()) {
Y_ABORT_UNLESS(afterMove);
continue;
}

auto res = ChangesQueue.emplace(
std::piecewise_construct,
std::forward_as_tuple(record.Order),
std::forward_as_tuple(record, now, cookie)
);
if (res.second) {
ChangesList.PushBack(&res.first->second);
forward.emplace_back(record.Order, record.PathId, record.BodySize);

Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
ChangesQueueBytes += record.BodySize;
it->second.EnqueuedAt = now;
it->second.ReservationCookie = cookie;
ChangesList.PushBack(&it->second);

if (record.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(record.TableId, record.SchemaVersion));
}
}
Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
ChangesQueueBytes += record.BodySize;
}

if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
Y_ABORT_UNLESS(!afterMove);
ChangeQueueReservedCapacity -= it->second;
ChangeQueueReservedCapacity += records.size();
}
Expand Down Expand Up @@ -1265,6 +1307,14 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<IDataShardChange
.SchemaVersion = schemaVersion,
});

auto res = ChangesQueue.emplace(records.back().Order, records.back());
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}

if (!rowset.Next()) {
return false;
}
Expand Down Expand Up @@ -1363,6 +1413,14 @@ bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShard
});
entry.Count++;
needSort = true;

auto res = ChangesQueue.emplace(records.back().Order, records.back());
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}
}

LockChangeRecords.erase(lockId);
Expand Down Expand Up @@ -1421,6 +1479,51 @@ void TDataShard::ScheduleRemoveAbandonedLockChanges() {
}
}

void TDataShard::ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key) {
Y_ABORT_UNLESS(!SchemaSnapshotManager.HasReference(key));

const auto* snapshot = SchemaSnapshotManager.FindSnapshot(key);
Y_ABORT_UNLESS(snapshot);

auto it = TableInfos.find(key.PathId);
if (it == TableInfos.end()) {
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
return;
}

if (snapshot->Schema->GetTableSchemaVersion() < it->second->GetTableSchemaVersion()) {
bool wasEmpty = PendingSchemaSnapshotsToGc.empty();
PendingSchemaSnapshotsToGc.push_back(key);
if (wasEmpty) {
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
}
}
}

void TDataShard::ScheduleRemoveAbandonedSchemaSnapshots() {
bool wasEmpty = PendingSchemaSnapshotsToGc.empty();

for (const auto& [key, snapshot] : SchemaSnapshotManager.GetSnapshots()) {
auto it = TableInfos.find(key.PathId);
if (it == TableInfos.end()) {
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
break;
}
if (SchemaSnapshotManager.HasReference(key)) {
continue;
}
if (snapshot.Schema->GetTableSchemaVersion() >= it->second->GetTableSchemaVersion()) {
continue;
}

PendingSchemaSnapshotsToGc.push_back(key);
}

if (wasEmpty && !PendingSchemaSnapshotsToGc.empty()) {
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
}
}

void TDataShard::PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation &op) {
db.Table<Schema::SchemaOperations>().Key(op.TxId).Update(
NIceDb::TUpdate<Schema::SchemaOperations::Success>(op.Success),
Expand Down Expand Up @@ -1649,8 +1752,18 @@ void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersio
Y_ABORT_UNLESS(TableInfos.contains(pathId.LocalPathId));
auto tableInfo = TableInfos[pathId.LocalPathId];

const auto key = TSchemaSnapshotKey(pathId.OwnerId, pathId.LocalPathId, tableSchemaVersion);
const auto key = TSchemaSnapshotKey(pathId, tableSchemaVersion);
SchemaSnapshotManager.AddSnapshot(txc.DB, key, TSchemaSnapshot(tableInfo, step, txId));

const auto& snapshots = SchemaSnapshotManager.GetSnapshots();
for (auto it = snapshots.lower_bound(TSchemaSnapshotKey(pathId, 1)); it != snapshots.end(); ++it) {
if (it->first == key) {
break;
}
if (!SchemaSnapshotManager.HasReference(it->first)) {
ScheduleRemoveSchemaSnapshot(it->first);
}
}
}

void TDataShard::PersistLastLoanTableTid(NIceDb::TNiceDb& db, ui32 localTid) {
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/tx/datashard/datashard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,12 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
return false;
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
if (!Self->SchemaSnapshotManager.Load(db)) {
return false;
}
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::ChangeRecords::TableId)) {
if (!Self->LoadChangeRecords(db, ChangeRecords)) {
return false;
Expand Down Expand Up @@ -512,12 +518,6 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
}
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
if (!Self->SchemaSnapshotManager.Load(db)) {
return false;
}
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) {
TDataShardLocksDb locksDb(*Self, txc);
if (!Self->SysLocks.Load(locksDb)) {
Expand Down Expand Up @@ -547,6 +547,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
Self->SubscribeNewLocks();

Self->ScheduleRemoveAbandonedLockChanges();
Self->ScheduleRemoveAbandonedSchemaSnapshots();

return true;
}
Expand Down
14 changes: 6 additions & 8 deletions ydb/core/tx/datashard/datashard_change_sending.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,13 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase<TDataShard> {
Self->RemoveChangeRecordsInFly = false;
}

if (!Self->ChangesQueue) { // double check queue
if (ChangeExchangeSplit) {
Self->KillChangeSender(ctx);
Self->ChangeExchangeSplitter.DoSplit(ctx);
}
if (ChangeExchangeSplit) {
Self->KillChangeSender(ctx);
Self->ChangeExchangeSplitter.DoSplit(ctx);
}

for (const auto dstTabletId : ActivationList) {
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
}
for (const auto dstTabletId : ActivationList) {
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
}

Self->CheckStateChange(ctx);
Expand Down
Loading
Loading