Skip to content

Commit

Permalink
EvWriteRows
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin committed Jan 11, 2024
1 parent c9a3d7f commit c9c7188
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 31 deletions.
8 changes: 5 additions & 3 deletions ydb/core/tx/datashard/datashard_ut_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
auto opts = TShardedTableOptions();
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);

auto upsertObserver = EvWrite ? ReplaceEvProposeTransactionWithEvWrite(runtime) : TTestActorRuntimeBase::TEventObserverHolder();
auto upsertResultObserver = EvWrite ? ReplaceEvProposeTransactionResultWithEvWrite(runtime) : TTestActorRuntimeBase::TEventObserverHolder();
EvWriteRows rows;
if (EvWrite)
rows = {{{0, 1}}, {{2, 3}}, {{4, 5}}};

// Write(runtime, sender, shards[0], tableId, opts.Columns_, 3, 100, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
auto upsertObserver = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
auto upsertResultObserver = ReplaceEvProposeTransactionResultWithEvWrite(runtime, rows);

ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 1);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 3);"));
Expand Down
62 changes: 36 additions & 26 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1878,8 +1878,13 @@ NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sen
return Write(runtime, sender, shardId, std::move(request), expectedStatus, std::move(traceId));
}

TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime) {
return TTestActorRuntimeBase::TEventObserverHolder(runtime.AddObserver([](TAutoPtr<IEventHandle>& event) {


TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, EvWriteRows& rows) {
if (rows.empty())
return {};

return TTestActorRuntimeBase::TEventObserverHolder(runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) {
if (event->GetTypeRewrite() != TEvDataShard::EvProposeTransaction)
return;

Expand All @@ -1894,37 +1899,37 @@ TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWri
const TString& txBody = record.GetTxBody();
NKikimrTxDataShard::TDataTransaction tx;
Y_VERIFY(tx.ParseFromArray(txBody.data(), txBody.size()));
const auto& tasks = tx.GetKqpTransaction().GetTasks();
Y_VERIFY_S(tasks.size() == 1, "Only 1 task is supported");
const auto& task = tasks[0];
NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;
Y_VERIFY(task.GetMeta().UnpackTo(&meta));
const auto& tableMeta = meta.GetTable();
ui64 tableId = tableMeta.GetTableId().GetTableId();
const auto& writes = meta.GetWrites();
ui16 colCount = writes.GetColumns().size();

// Construct new EvWrite
ui64 txId = record.GetTxId();
auto txMode = NKikimr::NDataShard::EvWrite::Convertor::GetTxMode(record.GetFlags());
ui32 rowCount = 1;
std::vector<ui32> columnIds(colCount);
std::iota(columnIds.begin(), columnIds.end(), 1);

TVector<TString> stringValues;
TVector<TCell> cells;
for (ui32 row = 0; row < rowCount; ++row) {
for (ui32 col = 0; col < colCount; ++col) {
ui32 value32 = 1;
cells.emplace_back(TCell((const char*)&value32, sizeof(ui32)));
}
ui64 tableId = 0;
ui16 colCount = 0;
for (const auto& task : tx.GetKqpTransaction().GetTasks()) {
NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;
Y_VERIFY(task.GetMeta().UnpackTo(&meta));
const auto& tableMeta = meta.GetTable();
Y_VERIFY_S(tableId == 0 || tableId == tableMeta.GetTableId().GetTableId(), "Only writes to one table is supported now");
tableId = tableMeta.GetTableId().GetTableId();
const auto& writes = meta.GetWrites();
Y_VERIFY_S(colCount == 0 || colCount == writes.GetColumns().size(), "Only equal column count is supported now.");
colCount = writes.GetColumns().size();

const auto& row = rows.ProcessNextRow();
Y_VERIFY(row.Cells.size() == colCount);
std::copy(row.Cells.begin(), row.Cells.end(), std::back_inserter(cells));
}
Y_VERIFY(!cells.empty());

TSerializedCellMatrix matrix(cells, rowCount, colCount);
TSerializedCellMatrix matrix(cells, cells.size() / colCount, colCount);
TString blobData = matrix.ReleaseBuffer();

UNIT_ASSERT(blobData.size() < 8_MB);

ui64 txId = record.GetTxId();
auto txMode = NKikimr::NDataShard::EvWrite::Convertor::GetTxMode(record.GetFlags());
std::vector<ui32> columnIds(colCount);
std::iota(columnIds.begin(), columnIds.end(), 1);

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
Expand All @@ -1936,11 +1941,16 @@ TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWri
}));
}

TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionResultWithEvWrite(TTestActorRuntime& runtime) {
return TTestActorRuntimeBase::TEventObserverHolder(runtime.AddObserver([](TAutoPtr<IEventHandle>& event) {
TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionResultWithEvWrite(TTestActorRuntime& runtime, EvWriteRows& rows) {
if (rows.empty())
return {};

return TTestActorRuntimeBase::TEventObserverHolder(runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) {
if (event->GetTypeRewrite() != NEvents::TDataEvents::EvWriteResult)
return;

rows.CompleteNextRow();

const auto& record = event->Get<NEvents::TDataEvents::TEvWriteResult>()->Record;
Cerr << "EvWriteResult event is observed and will be replaced with EvProposeTransactionResult: " << record.ShortDebugString() << Endl;

Expand Down
38 changes: 36 additions & 2 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -714,8 +714,42 @@ void ExecSQL(Tests::TServer::TPtr server,
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});

TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime);
TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionResultWithEvWrite(TTestActorRuntime& runtime);
struct EvWriteRow {
EvWriteRow(std::initializer_list<ui32> init) {
for (ui32 value : init) {
Cells.emplace_back(TCell((const char*)&value, sizeof(ui32)));
}
}

std::vector<TCell> Cells;

enum EStatus {
Init,
Processing,
Completed
} Status = Init;
};
class EvWriteRows : public std::vector<EvWriteRow> {
public:
EvWriteRows() = default;
EvWriteRows(std::initializer_list<EvWriteRow> init) :
std::vector<EvWriteRow>(init) { }

const EvWriteRow& ProcessNextRow() {
auto processedRow = std::find_if(begin(), end(), [](const auto& row) { return row.Status == EvWriteRow::EStatus::Init; });
Y_VERIFY_S(processedRow != end(), "There should be at lest one EvWrite row to process.");
processedRow->Status = EvWriteRow::EStatus::Processing;
return *processedRow;
}
void CompleteNextRow() {
auto processedRow = std::find_if(begin(), end(), [](const auto& row) { return row.Status == EvWriteRow::EStatus::Processing; });
Y_VERIFY_S(processedRow != end(), "There should be at lest one EvWrite row processing.");
processedRow->Status = EvWriteRow::EStatus::Completed;
}
};

TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, EvWriteRows& rows);
TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionResultWithEvWrite(TTestActorRuntime& runtime, EvWriteRows& rows);

void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values);

Expand Down

0 comments on commit c9c7188

Please sign in to comment.