Skip to content

Commit

Permalink
Tests: Replace EvProposeTransaction with EvWrite (ydb-platform#932)
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored Jan 12, 2024
1 parent 3a4b1a7 commit db4e909
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 6 deletions.
23 changes: 23 additions & 0 deletions ydb/core/tx/datashard/datashard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,27 @@ ui64 EvWrite::Convertor::GetProposeFlags(NKikimrDataEvents::TEvWrite::ETxMode tx
Y_FAIL_S("Unexpected tx mode " << txMode);
}
}

NKikimrDataEvents::TEvWrite::ETxMode EvWrite::Convertor::GetTxMode(ui64 flags) {
if ((flags & TTxFlags::Immediate) && !(flags & TTxFlags::ForceOnline)) {
return NKikimrDataEvents::TEvWrite::ETxMode::TEvWrite_ETxMode_MODE_IMMEDIATE;
}
else if (flags & TTxFlags::VolatilePrepare) {
return NKikimrDataEvents::TEvWrite::ETxMode::TEvWrite_ETxMode_MODE_VOLATILE_PREPARE;
}
else {
return NKikimrDataEvents::TEvWrite::ETxMode::TEvWrite_ETxMode_MODE_PREPARE;
}
}

NKikimrTxDataShard::TEvProposeTransactionResult::EStatus EvWrite::Convertor::GetStatus(NKikimrDataEvents::TEvWriteResult::EStatus status) {
switch (status) {
case NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED:
return NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE;
case NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED:
return NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED;
default:
return NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
}
}
}
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1570,7 +1570,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
Y_ABORT_UNLESS(writeTx);

auto badRequest = [&](const TString& error) {
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << "at tablet# " << Self->TabletID(), Self->TabletID());
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << " at tablet# " << Self->TabletID(), Self->TabletID());
LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error);
};

Expand Down
21 changes: 21 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,27 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
return {runtime, server, sender};
}

Y_UNIT_TEST_TWIN(Upsert, EvWrite) {
auto [runtime, server, sender] = TestCreateServer();

auto opts = TShardedTableOptions();
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);

auto rows = EvWrite ? TEvWriteRows{{{0, 1}}, {{2, 3}}, {{4, 5}}} : TEvWriteRows{};
auto upsertObserver = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
auto upsertResultObserver = ReplaceEvProposeTransactionResultWithEvWrite(runtime, rows);

ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 1);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 3);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (4, 5);"));

auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();

UNIT_ASSERT_VALUES_EQUAL(table1state, "key = 0, value = 1\n"
"key = 2, value = 3\n"
"key = 4, value = 5\n");
}

Y_UNIT_TEST(WriteImmediateOnShard) {
auto [runtime, server, sender] = TestCreateServer();

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/datashard/datashard_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/library/actors/core/event.h>
#include <ydb/core/protos/data_events.pb.h>
#include <ydb/core/protos/tx_datashard.pb.h>

#include <util/generic/ptr.h>

Expand All @@ -13,5 +14,7 @@ class Convertor {
public:
static ui64 GetTxId(const TAutoPtr<IEventHandle>& ev);
static ui64 GetProposeFlags(NKikimrDataEvents::TEvWrite::ETxMode txMode);
static NKikimrDataEvents::TEvWrite::ETxMode GetTxMode(ui64 flags);
static NKikimrTxDataShard::TEvProposeTransactionResult::EStatus GetStatus(NKikimrDataEvents::TEvWriteResult::EStatus status);
};
}
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ bool TValidatedWriteTx::ParseRecord(const TDataShard::TTableInfos& tableInfos) {
auto tableInfoPtr = tableInfos.FindPtr(tableIdRecord.GetTableId());
if (!tableInfoPtr) {
ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR;
ErrStr = TStringBuilder() << "Table '" << tableIdRecord.GetTableId() << "' doesn't exist";
ErrStr = TStringBuilder() << "Table '" << tableIdRecord.GetTableId() << "' doesn't exist.";
return false;
}
TableInfo = tableInfoPtr->Get();
Expand Down
107 changes: 103 additions & 4 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1875,6 +1875,103 @@ NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sen
return Write(runtime, sender, shardId, std::move(request), expectedStatus, std::move(traceId));
}



TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows) {
if (rows.empty())
return {};

return runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) {
if (event->GetTypeRewrite() != TEvDataShard::EvProposeTransaction)
return;

const auto& record = event->Get<TEvDataShard::TEvProposeTransaction>()->Record;

if (record.GetTxKind() != NKikimrTxDataShard::TX_KIND_DATA)
return;

// Parse original TEvProposeTransaction
const TString& txBody = record.GetTxBody();
NKikimrTxDataShard::TDataTransaction tx;
Y_VERIFY(tx.ParseFromArray(txBody.data(), txBody.size()));

// Construct new EvWrite
TVector<TCell> cells;
ui64 tableId = 0;
ui16 colCount = 0;
for (const auto& task : tx.GetKqpTransaction().GetTasks()) {
NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;
Y_VERIFY(task.GetMeta().UnpackTo(&meta));
if (!meta.HasWrites())
continue;

const auto& tableMeta = meta.GetTable();
Y_VERIFY_S(tableId == 0 || tableId == tableMeta.GetTableId().GetTableId(), "Only writes to one table is supported now");
tableId = tableMeta.GetTableId().GetTableId();
const auto& writes = meta.GetWrites();
Y_VERIFY_S(colCount == 0 || colCount == writes.GetColumns().size(), "Only equal column count is supported now.");
colCount = writes.GetColumns().size();

const auto& row = rows.ProcessNextRow();
Y_VERIFY(row.Cells.size() == colCount);
std::copy(row.Cells.begin(), row.Cells.end(), std::back_inserter(cells));
}

if (cells.empty()) {
Cerr << "TEvProposeTransaction TX_KIND_DATA has no writes.\n";
return;
}

Cerr << "TEvProposeTransaction TX_KIND_DATA event is observed and will be replaced with EvWrite: " << record.ShortDebugString() << Endl;

TSerializedCellMatrix matrix(cells, cells.size() / colCount, colCount);
TString blobData = matrix.ReleaseBuffer();

UNIT_ASSERT(blobData.size() < 8_MB);

ui64 txId = record.GetTxId();
auto txMode = NKikimr::NDataShard::EvWrite::Convertor::GetTxMode(record.GetFlags());
std::vector<ui32> columnIds(colCount);
std::iota(columnIds.begin(), columnIds.end(), 1);

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);

// Replace event
auto handle = new IEventHandle(event->Recipient, event->Sender, evWrite.release(), 0, event->Cookie);
handle->Rewrite(handle->GetTypeRewrite(), event->GetRecipientRewrite());
event.Reset(handle);
});
}

TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionResultWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows) {
if (rows.empty())
return {};

return runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) {
if (event->GetTypeRewrite() != NEvents::TDataEvents::EvWriteResult)
return;

rows.CompleteNextRow();

const auto& record = event->Get<NEvents::TDataEvents::TEvWriteResult>()->Record;
Cerr << "EvWriteResult event is observed and will be replaced with EvProposeTransactionResult: " << record.ShortDebugString() << Endl;

// Construct new EvProposeTransactionResult
ui64 txId = record.GetTxId();
ui64 origin = record.GetOrigin();
auto status = NKikimr::NDataShard::EvWrite::Convertor::GetStatus(record.GetStatus());

auto evResult = std::make_unique<TEvDataShard::TEvProposeTransactionResult>(NKikimrTxDataShard::TX_KIND_DATA, origin, txId, status);

// Replace event
auto handle = new IEventHandle(event->Recipient, event->Sender, evResult.release(), 0, event->Cookie);
handle->Rewrite(handle->GetTypeRewrite(), event->GetRecipientRewrite());
event.Reset(handle);
});
}

void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values)
{
auto txTypes = std::make_shared<NTxProxy::TUploadTypes>();
Expand Down Expand Up @@ -1903,9 +2000,10 @@ void WaitTabletBecomesOffline(TServer::TPtr server, ui64 tabletId)
struct IsShardStateChange
{
IsShardStateChange(ui64 tabletId)
: TabletId(tabletId)
{
}
:
TabletId(tabletId)
{
}

bool operator()(IEventHandle& ev)
{
Expand Down Expand Up @@ -1959,7 +2057,8 @@ namespace {
, Snapshot(snapshot)
, Ordered(ordered)
, State(pause ? EState::PauseWait : EState::Normal)
{ }
{
}

void Bootstrap(const TActorContext& ctx) {
auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
Expand Down
39 changes: 39 additions & 0 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,45 @@ void ExecSQL(Tests::TServer::TPtr server,
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});

struct TEvWriteRow {
TEvWriteRow(std::initializer_list<ui32> init) {
for (ui32 value : init) {
Cells.emplace_back(TCell((const char*)&value, sizeof(ui32)));
}
}

std::vector<TCell> Cells;

enum EStatus {
Init,
Processing,
Completed
} Status = Init;
};
class TEvWriteRows : public std::vector<TEvWriteRow> {
public:
TEvWriteRows() = default;
TEvWriteRows(std::initializer_list<TEvWriteRow> init) :
std::vector<TEvWriteRow>(init) { }

const TEvWriteRow& ProcessNextRow() {
auto processedRow = std::find_if(begin(), end(), [](const auto& row) { return row.Status == TEvWriteRow::EStatus::Init; });
Y_VERIFY_S(processedRow != end(), "There should be at least one EvWrite row to process.");
processedRow->Status = TEvWriteRow::EStatus::Processing;
Cerr << "Processing next EvWrite row\n";
return *processedRow;
}
void CompleteNextRow() {
auto processedRow = std::find_if(begin(), end(), [](const auto& row) { return row.Status == TEvWriteRow::EStatus::Processing; });
Y_VERIFY_S(processedRow != end(), "There should be at lest one EvWrite row processing.");
processedRow->Status = TEvWriteRow::EStatus::Completed;
Cerr << "Completed next EvWrite row\n";
}
};

TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows);
TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionResultWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows);

void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values);

struct IsTxResultComplete {
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/actors/testlib/test_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ namespace NActors {
TEventObserverHolder& operator=(TEventObserverHolder&& other) noexcept {
if (this != &other)
{
Remove();

List = std::move(other.List);
Iter = std::move(other.Iter);

Expand Down

0 comments on commit db4e909

Please sign in to comment.