Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add records to change queue at Execute() stage #6497

Merged
merged 8 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/counters_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -490,4 +490,5 @@ enum ETxTypes {
TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}];
TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}];
TXTYPE_WRITE = 82 [(TxTypeOpts) = {Name: "TxWrite"}];
TXTYPE_REMOVE_SCHEMA_SNAPSHOTS = 83 [(TxTypeOpts) = {Name: "TxRemoveSchemaSnapshots"}];
}
148 changes: 116 additions & 32 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,28 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()),
NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody()),
NIceDb::TUpdate<Schema::ChangeRecordDetails::Source>(record.GetSource()));

auto res = ChangesQueue.emplace(record.GetOrder(), record);
Y_VERIFY_S(res.second, "Duplicate change record: " << record.GetOrder());

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}

db.GetDatabase().OnRollback([this, order = record.GetOrder()] {
auto it = ChangesQueue.find(order);
Y_VERIFY_S(it != ChangesQueue.end(), "Cannot find change record: " << order);

if (it->second.SchemaSnapshotAcquired) {
const auto snapshotKey = TSchemaSnapshotKey(it->second.TableId, it->second.SchemaVersion);
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
ScheduleRemoveSchemaSnapshot(snapshotKey);
}
}

ChangesQueue.erase(it);
});
} else {
auto& state = LockChangeRecords[lockId];
Y_ABORT_UNLESS(state.Changes.empty() || state.Changes.back().LockOffset < record.GetLockOffset(),
Expand Down Expand Up @@ -934,6 +956,14 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
committed.Step = rowVersion.Step;
committed.TxId = rowVersion.TxId;
collected.push_back(committed);

auto res = ChangesQueue.emplace(committed.Order, committed);
Y_VERIFY_S(res.second, "Duplicate change record: " << committed.Order);

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}
}

Y_VERIFY_S(!CommittedLockChangeRecords.contains(lockId), "Cannot commit lock " << lockId << " more than once");
Expand All @@ -960,7 +990,26 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
LockChangeRecords.erase(it);
});
db.GetDatabase().OnRollback([this, lockId]() {
CommittedLockChangeRecords.erase(lockId);
auto it = CommittedLockChangeRecords.find(lockId);
Y_VERIFY_S(it != CommittedLockChangeRecords.end(), "Unexpected failure to find lockId# " << lockId);

for (size_t i = 0; i < it->second.Count; ++i) {
const ui64 order = it->second.Order + i;

auto cIt = ChangesQueue.find(order);
Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order);

if (cIt->second.SchemaSnapshotAcquired) {
const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion);
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
ScheduleRemoveSchemaSnapshot(snapshotKey);
}
}

ChangesQueue.erase(cIt);
}

CommittedLockChangeRecords.erase(it);
});
}

Expand Down Expand Up @@ -1022,23 +1071,9 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
ChangesQueueBytes -= record.BodySize;

if (record.SchemaSnapshotAcquired) {
Y_ABORT_UNLESS(record.TableId);
auto tableIt = TableInfos.find(record.TableId.LocalPathId);

if (tableIt != TableInfos.end()) {
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey);

if (last) {
const auto* snapshot = SchemaSnapshotManager.FindSnapshot(snapshotKey);
Y_ABORT_UNLESS(snapshot);

if (snapshot->Schema->GetTableSchemaVersion() < tableIt->second->GetTableSchemaVersion()) {
SchemaSnapshotManager.RemoveShapshot(db, snapshotKey);
}
}
} else {
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
if (const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
ScheduleRemoveSchemaSnapshot(snapshotKey);
}
}

Expand Down Expand Up @@ -1081,22 +1116,15 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
for (const auto& record : records) {
forward.emplace_back(record.Order, record.PathId, record.BodySize);

auto res = ChangesQueue.emplace(
std::piecewise_construct,
std::forward_as_tuple(record.Order),
std::forward_as_tuple(record, now, cookie)
);
if (res.second) {
ChangesList.PushBack(&res.first->second);
auto it = ChangesQueue.find(record.Order);
Y_ABORT_UNLESS(it != ChangesQueue.end());

Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
ChangesQueueBytes += record.BodySize;
it->second.EnqueuedAt = now;
it->second.ReservationCookie = cookie;
ChangesList.PushBack(&it->second);

if (record.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(record.TableId, record.SchemaVersion));
}
}
Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
ChangesQueueBytes += record.BodySize;
}

if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
Expand Down Expand Up @@ -1265,6 +1293,14 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<IDataShardChange
.SchemaVersion = schemaVersion,
});

auto res = ChangesQueue.emplace(records.back().Order, records.back());
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}

if (!rowset.Next()) {
return false;
}
Expand Down Expand Up @@ -1363,6 +1399,14 @@ bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShard
});
entry.Count++;
needSort = true;

auto res = ChangesQueue.emplace(records.back().Order, records.back());
Y_VERIFY_S(res.second, "Duplicate change record: " << records.back().Order);

if (res.first->second.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
}
}

LockChangeRecords.erase(lockId);
Expand Down Expand Up @@ -1421,6 +1465,46 @@ void TDataShard::ScheduleRemoveAbandonedLockChanges() {
}
}

void TDataShard::ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key) {
Y_ABORT_UNLESS(!SchemaSnapshotManager.HasReference(key));

const auto* snapshot = SchemaSnapshotManager.FindSnapshot(key);
Y_ABORT_UNLESS(snapshot);

auto it = TableInfos.find(key.PathId);
if (it == TableInfos.end()) {
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
return;
}

if (snapshot->Schema->GetTableSchemaVersion() < it->second->GetTableSchemaVersion()) {
bool wasEmpty = PendingSchemaSnapshotsToRemove.empty();
PendingSchemaSnapshotsToRemove.push_back(key);
if (wasEmpty) {
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
}
}
}

void TDataShard::ScheduleRemoveAbandonedSchemaSnapshots() {
bool wasEmpty = PendingSchemaSnapshotsToRemove.empty();

for (const auto& [key, snapshot] : SchemaSnapshotManager.GetSnapshots()) {
auto it = TableInfos.find(key.PathId);
if (it == TableInfos.end()) {
Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
break;
}
if (snapshot.Schema->GetTableSchemaVersion() < it->second->GetTableSchemaVersion()) {
PendingSchemaSnapshotsToRemove.push_back(key);
}
}

if (wasEmpty && !PendingSchemaSnapshotsToRemove.empty()) {
Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots);
}
}

void TDataShard::PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation &op) {
db.Table<Schema::SchemaOperations>().Key(op.TxId).Update(
NIceDb::TUpdate<Schema::SchemaOperations::Success>(op.Success),
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/tx/datashard/datashard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,12 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
return false;
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
if (!Self->SchemaSnapshotManager.Load(db)) {
return false;
}
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::ChangeRecords::TableId)) {
if (!Self->LoadChangeRecords(db, ChangeRecords)) {
return false;
Expand Down Expand Up @@ -512,12 +518,6 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
}
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) {
if (!Self->SchemaSnapshotManager.Load(db)) {
return false;
}
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) {
TDataShardLocksDb locksDb(*Self, txc);
if (!Self->SysLocks.Load(locksDb)) {
Expand Down Expand Up @@ -547,6 +547,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
Self->SubscribeNewLocks();

Self->ScheduleRemoveAbandonedLockChanges();
Self->ScheduleRemoveAbandonedSchemaSnapshots();

return true;
}
Expand Down
14 changes: 6 additions & 8 deletions ydb/core/tx/datashard/datashard_change_sending.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,13 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase<TDataShard> {
Self->RemoveChangeRecordsInFly = false;
}

if (!Self->ChangesQueue) { // double check queue
if (ChangeExchangeSplit) {
Self->KillChangeSender(ctx);
Self->ChangeExchangeSplitter.DoSplit(ctx);
}
if (ChangeExchangeSplit) {
Self->KillChangeSender(ctx);
Self->ChangeExchangeSplitter.DoSplit(ctx);
}

for (const auto dstTabletId : ActivationList) {
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
}
for (const auto dstTabletId : ActivationList) {
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
}

Self->CheckStateChange(ctx);
Expand Down
32 changes: 24 additions & 8 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class TDataShard
class TTxCdcStreamScanProgress;
class TTxCdcStreamEmitHeartbeats;
class TTxUpdateFollowerReadEdge;
class TTxRemoveSchemaSnapshots;

template <typename T> friend class TTxDirectBase;
class TTxUploadRows;
Expand Down Expand Up @@ -374,6 +375,7 @@ class TDataShard
EvPlanPredictedTxs,
EvStatisticsScanFinished,
EvTableStatsError,
EvRemoveSchemaSnapshots,
EvEnd
};

Expand Down Expand Up @@ -595,6 +597,8 @@ class TDataShard
struct TEvPlanPredictedTxs : public TEventLocal<TEvPlanPredictedTxs, EvPlanPredictedTxs> {};

struct TEvStatisticsScanFinished : public TEventLocal<TEvStatisticsScanFinished, EvStatisticsScanFinished> {};

struct TEvRemoveSchemaSnapshots : public TEventLocal<TEvRemoveSchemaSnapshots, EvRemoveSchemaSnapshots> {};
};

struct Schema : NIceDb::Schema {
Expand Down Expand Up @@ -1383,6 +1387,8 @@ class TDataShard

void Handle(TEvPrivate::TEvPlanPredictedTxs::TPtr& ev, const TActorContext& ctx);

void Handle(TEvPrivate::TEvRemoveSchemaSnapshots::TPtr& ev, const TActorContext& ctx);

void HandleByReplicationSourceOffsetsServer(STATEFN_SIG);

void DoPeriodicTasks(const TActorContext &ctx);
Expand Down Expand Up @@ -1920,6 +1926,8 @@ class TDataShard
bool LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShardChangeCollector::TChange>& records);
void ScheduleRemoveLockChanges(ui64 lockId);
void ScheduleRemoveAbandonedLockChanges();
void ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key);
void ScheduleRemoveAbandonedSchemaSnapshots();

static void PersistCdcStreamScanLastKey(NIceDb::TNiceDb& db, const TSerializedCellVec& value,
const TPathId& tablePathId, const TPathId& streamPathId);
Expand Down Expand Up @@ -2804,24 +2812,29 @@ class TDataShard
ui64 LockOffset;
ui64 ReservationCookie;

explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId,
ui64 schemaVersion, TInstant created, TInstant enqueued,
ui64 lockId = 0, ui64 lockOffset = 0, ui64 cookie = 0)
explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, ui64 schemaVersion,
TInstant created, ui64 lockId = 0, ui64 lockOffset = 0)
: BodySize(bodySize)
, TableId(tableId)
, SchemaVersion(schemaVersion)
, SchemaSnapshotAcquired(false)
, CreatedAt(created)
, EnqueuedAt(enqueued)
, EnqueuedAt(TInstant::Zero())
, LockId(lockId)
, LockOffset(lockOffset)
, ReservationCookie(cookie)
, ReservationCookie(0)
{
}

explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record)
: TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion,
record.CreatedAt(), record.LockId, record.LockOffset)
{
}

explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now, ui64 cookie)
: TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, record.CreatedAt(), now,
record.LockId, record.LockOffset, cookie)
explicit TEnqueuedRecord(const TChangeRecord& record)
: TEnqueuedRecord(record.GetBody().size(), record.GetTableId(), record.GetSchemaVersion(),
record.GetApproximateCreationDateTime(), record.GetLockId(), record.GetLockOffset())
{
}
};
Expand Down Expand Up @@ -2866,6 +2879,7 @@ class TDataShard
THashMap<ui64, TUncommittedLockChangeRecords> LockChangeRecords; // ui64 is lock id
THashMap<ui64, TCommittedLockChangeRecords> CommittedLockChangeRecords; // ui64 is lock id
TVector<ui64> PendingLockChangeRecordsToRemove;
TVector<TSchemaSnapshotKey> PendingSchemaSnapshotsToRemove;

// in
THashMap<ui64, TInChangeSender> InChangeSenders; // ui64 is shard id
Expand Down Expand Up @@ -2986,6 +3000,7 @@ class TDataShard
HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle);
HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle);
HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle);
HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateInactive unhandled event type: " << ev->GetTypeRewrite()
Expand Down Expand Up @@ -3114,6 +3129,7 @@ class TDataShard
HFunc(TEvPrivate::TEvPlanPredictedTxs, Handle);
HFunc(NStat::TEvStatistics::TEvStatisticsRequest, Handle);
HFunc(TEvPrivate::TEvStatisticsScanFinished, Handle);
HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateWork unhandled event type: " << ev->GetTypeRewrite() << " event: " << ev->ToString());
Expand Down
Loading
Loading