Skip to content

Commit

Permalink
Remove support for non-scan read columns requests (#1240)
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Jan 24, 2024
1 parent adc39a9 commit 57fe793
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 183 deletions.
5 changes: 0 additions & 5 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
, DataTxProfileLogThresholdMs(0, 0, 86400000)
, DataTxProfileBufferThresholdMs(0, 0, 86400000)
, DataTxProfileBufferSize(0, 1000, 100)
, ReadColumnsScanEnabled(1, 0, 1)
, ReadColumnsScanInUserPool(0, 0, 1)
, BackupReadAheadLo(0, 0, 64*1024*1024)
, BackupReadAheadHi(0, 0, 128*1024*1024)
, TtlReadAheadLo(0, 0, 64*1024*1024)
Expand Down Expand Up @@ -314,9 +312,6 @@ void TDataShard::IcbRegister() {
appData->Icb->RegisterSharedControl(HighDataSizeReportThreshlodBytes, "DataShardControls.HighDataSizeReportThreshlodBytes");
appData->Icb->RegisterSharedControl(HighDataSizeReportIntervalSeconds, "DataShardControls.HighDataSizeReportIntervalSeconds");

appData->Icb->RegisterSharedControl(ReadColumnsScanEnabled, "DataShardControls.ReadColumnsScanEnabled");
appData->Icb->RegisterSharedControl(ReadColumnsScanInUserPool, "DataShardControls.ReadColumnsScanInUserPool");

appData->Icb->RegisterSharedControl(BackupReadAheadLo, "DataShardControls.BackupReadAheadLo");
appData->Icb->RegisterSharedControl(BackupReadAheadHi, "DataShardControls.BackupReadAheadHi");

Expand Down
207 changes: 32 additions & 175 deletions ydb/core/tx/datashard/datashard__read_columns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ class TDataShard::TTxReadColumns : public NTabletFlatExecutor::TTransactionBase<
bool InclusiveTo;
ui64 RowsLimit = 100000;
ui64 BytesLimit = 1024*1024;
ui64 Restarts = 0;
TRowVersion ReadVersion = TRowVersion::Max();

public:
Expand All @@ -200,18 +199,7 @@ class TDataShard::TTxReadColumns : public NTabletFlatExecutor::TTransactionBase<

TTxType GetTxType() const override { return TXTYPE_READ_COLUMNS; }

bool Precharge(NTable::TDatabase& db, ui32 localTid, const TVector<NTable::TTag>& valueColumns) {
bool ready = db.Precharge(localTid,
KeyFrom,
KeyTo,
valueColumns,
0,
RowsLimit, BytesLimit,
NTable::EDirection::Forward, ReadVersion);
return ready;
}

bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
bool Execute(TTransactionContext&, const TActorContext& ctx) override {
// FIXME: we need to transform HEAD into some non-repeatable snapshot here
if (!ReadVersion.IsMax() && Self->GetVolatileTxManager().HasVolatileTxsAtSnapshot(ReadVersion)) {
Self->GetVolatileTxManager().AttachWaitingSnapshotEvent(
Expand All @@ -223,38 +211,13 @@ class TDataShard::TTxReadColumns : public NTabletFlatExecutor::TTransactionBase<

Result = new TEvDataShard::TEvReadColumnsResponse(Self->TabletID());

bool useScan = Self->ReadColumnsScanEnabled;

if (Self->IsFollower()) {
NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK;
TString errMessage;

if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage))
return false;

if (status != NKikimrTxDataShard::TError::OK) {
SetError(status, errMessage);
return true;
}

if (!ReadVersion.IsMax()) {
NIceDb::TNiceDb db(txc.DB);
TRowVersion lastCompleteTx;
if (!TDataShard::SysGetUi64(db, Schema::Sys_LastCompleteStep, lastCompleteTx.Step))
return false;
if (!TDataShard::SysGetUi64(db, Schema::Sys_LastCompleteTx, lastCompleteTx.TxId))
return false;

if (ReadVersion > lastCompleteTx) {
SetError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE,
TStringBuilder() << "RO replica last version " << lastCompleteTx
<< " lags behind the requested snapshot " << ReadVersion
<< " shard " << Self->TabletID());
return true;
}
}
// Note: this request is no longer supported, and it has never been used with followers
NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::WRONG_SHARD_STATE;
TString errMessage = "followers are not supported";

useScan = false;
SetError(status, errMessage);
return true;
}

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Read columns: " << Ev->Get()->Record);
Expand Down Expand Up @@ -330,26 +293,6 @@ class TDataShard::TTxReadColumns : public NTabletFlatExecutor::TTransactionBase<

TSerializedCellVec toKeyCells;

if (!useScan) {
// Use histogram to limit the range for single request
const auto& sizeHistogram = tableInfo.Stats.DataStats.DataSizeHistogram;
auto histIt = LowerBound(sizeHistogram.begin(), sizeHistogram.end(), fromKeyCells,
[&tableInfo] (const NTable::TBucket& bucket, const TSerializedCellVec& key) {
TSerializedCellVec bk(bucket.EndKey);
return CompareTypedCellVectors(
bk.GetCells().data(), key.GetCells().data(),
tableInfo.KeyColumnTypes.data(),
bk.GetCells().size(), key.GetCells().size()) < 0;
});

if (histIt != sizeHistogram.end() && ++histIt != sizeHistogram.end()) {
toKeyCells.Parse(histIt->EndKey);
for (ui32 i = 0; i < toKeyCells.GetCells().size(); ++i) {
KeyTo.push_back(TRawTypeValue(toKeyCells.GetCells()[i].AsRef(), tableInfo.KeyColumnTypes[i]));
}
}
}

TVector<NTable::TTag> valueColumns;
TVector<NScheme::TTypeInfo> valueColumnTypes;
TVector<std::pair<TString, NScheme::TTypeInfo>> columns;
Expand Down Expand Up @@ -384,121 +327,35 @@ class TDataShard::TTxReadColumns : public NTabletFlatExecutor::TTransactionBase<

tableInfo.Stats.AccessTime = TAppData::TimeProvider->Now();

if (useScan) {
if (snapshotKey) {
if (!Self->GetSnapshotManager().AcquireReference(*snapshotKey)) {
SetError(NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST,
TStringBuilder() << "Table id " << tableId << " has no snapshot at " << ReadVersion
<< " shard " << Self->TabletID() << (Self->IsFollower() ? " RO replica" : ""));
return true;
}
}

auto* scan = new TReadColumnsScan(TKeyBoundary{fromKeyCells, InclusiveFrom},
TKeyBoundary{toKeyCells, InclusiveTo},
valueColumns, valueColumnTypes,
std::move(blockBuilder), RowsLimit, BytesLimit,
TKeyBoundary{tableInfo.Range.To, tableInfo.Range.ToInclusive},
Ev->Sender, ctx.SelfID,
snapshotKey,
tableInfo.Path,
Self->TabletID());
auto opts = TScanOptions()
.SetResourceBroker("scan", 10)
.SetSnapshotRowVersion(ReadVersion)
.SetActorPoolId(Self->ReadColumnsScanInUserPool ? AppData(ctx)->UserPoolId : AppData(ctx)->BatchPoolId)
.SetReadAhead(512*1024, 1024*1024)
.SetReadPrio(TScanOptions::EReadPrio::Low);

ui64 cookie = -1; // Should be ignored
Self->QueueScan(localTableId, scan, cookie, opts);

Result.Destroy(); // Scan is now responsible for sending the result

return true;
}

// TODO: make sure KeyFrom and KeyTo properly reference non-inline cells data

if (!Precharge(txc.DB, localTableId, valueColumns))
return false;

size_t rows = 0;
size_t bytes = 0;
bool shardFinished = false;

{
NTable::TKeyRange iterRange;
iterRange.MinKey = KeyFrom;
iterRange.MinInclusive = InclusiveFrom;

auto iter = txc.DB.IterateRange(localTableId, iterRange, valueColumns, ReadVersion);

TString lastKeySerialized;
bool lastKeyInclusive = true;
while (iter->Next(NTable::ENext::All) == NTable::EReady::Data) {
TDbTupleRef rowKey = iter->GetKey();
lastKeySerialized = TSerializedCellVec::Serialize(rowKey.Cells());

// Compare current row with right boundary
int cmp = -1;// CompareTypedCellVectors(tuple.Columns, KeyTo.data(), tuple.Types, KeyTo.size());

if (cmp == 0 && KeyTo.size() < rowKey.ColumnCount) {
cmp = -1;
}
if (InclusiveTo) {
if (cmp > 0)
break; // Stop iff greater(cmp > 0)
} else {
if (cmp >= 0)
break; // Stop iff equal(cmp == 0) or greater(cmp > 0)
}

// Skip erased row
if (iter->Row().GetRowState() == NTable::ERowOp::Erase) {
continue;
}

TDbTupleRef rowValues = iter->GetValues();

blockBuilder->AddRow(rowKey, rowValues);

rows++;
bytes = blockBuilder->Bytes();

if (rows >= RowsLimit || bytes >= BytesLimit)
break;
}

// We don't want to do many restarts if pages weren't precharged
// So we just return whatever we read so far and the client can request more rows
if (iter->Last() == NTable::EReady::Page && rows < 1000 && bytes < 100000 && Restarts < 1) {
++Restarts;
return false;
}

if (iter->Last() == NTable::EReady::Gone) {
shardFinished = true;
lastKeySerialized = tableInfo.Range.To.GetBuffer();
lastKeyInclusive = tableInfo.Range.ToInclusive;
if (snapshotKey) {
if (!Self->GetSnapshotManager().AcquireReference(*snapshotKey)) {
SetError(NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST,
TStringBuilder() << "Table id " << tableId << " has no snapshot at " << ReadVersion
<< " shard " << Self->TabletID() << (Self->IsFollower() ? " RO replica" : ""));
return true;
}

TString buffer = blockBuilder->Finish();
buffer.resize(blockBuilder->Bytes());

Result->Record.SetBlocks(buffer);
Result->Record.SetLastKey(lastKeySerialized);
Result->Record.SetLastKeyInclusive(lastKeyInclusive);
Result->Record.SetEndOfShard(shardFinished);
}

Self->IncCounter(COUNTER_READ_COLUMNS_ROWS, rows);
Self->IncCounter(COUNTER_READ_COLUMNS_BYTES, bytes);

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
<< " Read columns result for table [" << tableInfo.Path << "]: "
<< rows << " rows, " << bytes << " bytes (event size "
<< Result->Record.GetBlocks().size() << ") shardFinished: " << shardFinished);
auto* scan = new TReadColumnsScan(TKeyBoundary{fromKeyCells, InclusiveFrom},
TKeyBoundary{toKeyCells, InclusiveTo},
valueColumns, valueColumnTypes,
std::move(blockBuilder), RowsLimit, BytesLimit,
TKeyBoundary{tableInfo.Range.To, tableInfo.Range.ToInclusive},
Ev->Sender, ctx.SelfID,
snapshotKey,
tableInfo.Path,
Self->TabletID());
auto opts = TScanOptions()
.SetResourceBroker("scan", 10)
.SetSnapshotRowVersion(ReadVersion)
.SetActorPoolId(AppData(ctx)->BatchPoolId)
.SetReadAhead(512*1024, 1024*1024)
.SetReadPrio(TScanOptions::EReadPrio::Low);

ui64 cookie = -1; // Should be ignored
Self->QueueScan(localTableId, scan, cookie, opts);

Result.Destroy(); // Scan is now responsible for sending the result

return true;
}
Expand Down
3 changes: 0 additions & 3 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2631,9 +2631,6 @@ class TDataShard
TControlWrapper DataTxProfileBufferThresholdMs;
TControlWrapper DataTxProfileBufferSize;

TControlWrapper ReadColumnsScanEnabled;
TControlWrapper ReadColumnsScanInUserPool;

TControlWrapper BackupReadAheadLo;
TControlWrapper BackupReadAheadHi;

Expand Down

0 comments on commit 57fe793

Please sign in to comment.