Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Sep 25, 2024
1 parent 92a1247 commit 53620be
Show file tree
Hide file tree
Showing 10 changed files with 531 additions and 502 deletions.
16 changes: 8 additions & 8 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,12 +459,12 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
if (conclusionParse.IsFail()) {
sendError(conclusionParse.GetErrorMessage(), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
} else {
if (commitOperation->NeedSyncLocks()) {
auto* lockInfo = OperationsManager->GetLockOptional(commitOperation->GetLockId());
if (!lockInfo) {
sendError("haven't lock for commit: " + ::ToString(commitOperation->GetLockId()),
NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED);
} else {
auto* lockInfo = OperationsManager->GetLockOptional(commitOperation->GetLockId());
if (!lockInfo) {
sendError("haven't lock for commit: " + ::ToString(commitOperation->GetLockId()),
NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
} else {
if (commitOperation->NeedSyncLocks()) {
if (lockInfo->GetGeneration() != commitOperation->GetGeneration()) {
sendError("tablet lock have another generation: " + ::ToString(lockInfo->GetGeneration()) +
" != " + ::ToString(commitOperation->GetGeneration()),
Expand All @@ -477,9 +477,9 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
} else {
Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx);
}
} else {
Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx);
}
} else {
Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx);
}
}
return;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString
return (res.GetStatus() == NKikimrTxColumnShard::PREPARED);
}

void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap) {
void PlanSchemaTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap) {
auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0);
auto tx = plan->Record.AddTransactions();
tx->SetTxId(snap.GetTxId());
Expand All @@ -78,7 +78,7 @@ void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot
UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::SUCCESS);
}

void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult) {
void PlanWriteTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap, bool waitResult) {
auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0);
auto tx = plan->Record.AddTransactions();
tx->SetTxId(snap.GetTxId());
Expand Down Expand Up @@ -229,7 +229,7 @@ void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, con
PlanCommit(runtime, sender, TTestTxConfig::TxTablet0, planStep, txIds);
}

void Wakeup(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId) {
void Wakeup(TTestBasicRuntime& runtime, const TActorId& sender, const ui64 shardId) {
auto wakeup = std::make_unique<TEvPrivate::TEvPeriodicWakeup>(true);
ForwardToTablet(runtime, shardId, sender, wakeup.release());
}
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,9 @@ struct TTestSchema {

bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap);
void ProvideTieringSnapshot(TTestBasicRuntime& runtime, const TActorId& sender, NMetadata::NFetcher::ISnapshot::TPtr snapshot);
void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap);
void PlanSchemaTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap);

void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true);
void PlanWriteTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true);

bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 writeId, const ui64 tableId, const TString& data,
const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, std::vector<ui64>* writeIds,
Expand Down Expand Up @@ -435,7 +435,7 @@ inline void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planSt
PlanCommit(runtime, sender, planStep, ids);
}

void Wakeup(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId);
void Wakeup(TTestBasicRuntime& runtime, const TActorId& sender, const ui64 shardId);

struct TTestBlobOptions {
THashSet<TString> NullColumns;
Expand Down
101 changes: 101 additions & 0 deletions ydb/core/tx/columnshard/test_helper/shard_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#include "shard_reader.h"

namespace NKikimr::NTxUT {

std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TShardReader::BuildStartEvent() const {
auto ev = std::make_unique<TEvDataShard::TEvKqpScan>();
ev->Record.SetLocalPathId(PathId);
ev->Record.MutableSnapshot()->SetStep(Snapshot.GetPlanStep());
ev->Record.MutableSnapshot()->SetTxId(Snapshot.GetTxId());

ev->Record.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_FULL);
ev->Record.SetTxId(Snapshot.GetTxId());

ev->Record.SetReverse(Reverse);
ev->Record.SetItemsLimit(Limit);

ev->Record.SetDataFormat(NKikimrDataEvents::FORMAT_ARROW);

auto protoRanges = ev->Record.MutableRanges();
protoRanges->Reserve(Ranges.size());
for (auto& range : Ranges) {
auto newRange = protoRanges->Add();
range.Serialize(*newRange);
}

if (ProgramProto) {
NKikimrSSA::TOlapProgram olapProgram;
{
TString programBytes;
TStringOutput stream(programBytes);
ProgramProto->SerializeToArcadiaStream(&stream);
olapProgram.SetProgram(programBytes);
}
{
TString programBytes;
TStringOutput stream(programBytes);
olapProgram.SerializeToArcadiaStream(&stream);
ev->Record.SetOlapProgram(programBytes);
}
ev->Record.SetOlapProgramType(NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS);
} else if (SerializedProgram) {
ev->Record.SetOlapProgram(*SerializedProgram);
ev->Record.SetOlapProgramType(NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS);
}

return ev;
}

NKikimr::NTxUT::TShardReader& TShardReader::SetReplyColumns(const std::vector<TString>& replyColumns) {
AFL_VERIFY(!SerializedProgram);
if (!ProgramProto) {
ProgramProto = NKikimrSSA::TProgram();
}
for (auto&& command : *ProgramProto->MutableCommand()) {
if (command.HasProjection()) {
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumns) {
proj.AddColumns()->SetName(i);
}
*command.MutableProjection() = proj;
return *this;
}
}
{
auto* command = ProgramProto->AddCommand();
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumns) {
proj.AddColumns()->SetName(i);
}
*command->MutableProjection() = proj;
}
return *this;
}

NKikimr::NTxUT::TShardReader& TShardReader::SetReplyColumnIds(const std::vector<ui32>& replyColumnIds) {
AFL_VERIFY(!SerializedProgram);
if (!ProgramProto) {
ProgramProto = NKikimrSSA::TProgram();
}
for (auto&& command : *ProgramProto->MutableCommand()) {
if (command.HasProjection()) {
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumnIds) {
proj.AddColumns()->SetId(i);
}
*command.MutableProjection() = proj;
return *this;
}
}
{
auto* command = ProgramProto->AddCommand();
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumnIds) {
proj.AddColumns()->SetId(i);
}
*command->MutableProjection() = proj;
}
return *this;
}

}
100 changes: 3 additions & 97 deletions ydb/core/tx/columnshard/test_helper/shard_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,53 +28,7 @@ class TShardReader {
std::vector<TString> ReplyColumns;
std::vector<TSerializedTableRange> Ranges;

std::unique_ptr<TEvDataShard::TEvKqpScan> BuildStartEvent() const {
auto ev = std::make_unique<TEvDataShard::TEvKqpScan>();
ev->Record.SetLocalPathId(PathId);
ev->Record.MutableSnapshot()->SetStep(Snapshot.GetPlanStep());
ev->Record.MutableSnapshot()->SetTxId(Snapshot.GetTxId());

ev->Record.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_FULL);
ev->Record.SetTxId(Snapshot.GetTxId());

ev->Record.SetReverse(Reverse);
ev->Record.SetItemsLimit(Limit);

ev->Record.SetDataFormat(NKikimrDataEvents::FORMAT_ARROW);

auto protoRanges = ev->Record.MutableRanges();
protoRanges->Reserve(Ranges.size());
for (auto& range : Ranges) {
auto newRange = protoRanges->Add();
range.Serialize(*newRange);
}

if (ProgramProto) {
NKikimrSSA::TOlapProgram olapProgram;
{
TString programBytes;
TStringOutput stream(programBytes);
ProgramProto->SerializeToArcadiaStream(&stream);
olapProgram.SetProgram(programBytes);
}
{
TString programBytes;
TStringOutput stream(programBytes);
olapProgram.SerializeToArcadiaStream(&stream);
ev->Record.SetOlapProgram(programBytes);
}
ev->Record.SetOlapProgramType(
NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS
);
} else if (SerializedProgram) {
ev->Record.SetOlapProgram(*SerializedProgram);
ev->Record.SetOlapProgramType(
NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS
);
}

return ev;
}
std::unique_ptr<TEvDataShard::TEvKqpScan> BuildStartEvent() const;

std::vector<std::shared_ptr<arrow::RecordBatch>> ResultBatches;
YDB_READONLY(ui32, IterationsCount, 0);
Expand All @@ -100,57 +54,9 @@ class TShardReader {
return r ? r->num_rows() : 0;
}

TShardReader& SetReplyColumns(const std::vector<TString>& replyColumns) {
AFL_VERIFY(!SerializedProgram);
if (!ProgramProto) {
ProgramProto = NKikimrSSA::TProgram();
}
for (auto&& command : *ProgramProto->MutableCommand()) {
if (command.HasProjection()) {
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumns) {
proj.AddColumns()->SetName(i);
}
*command.MutableProjection() = proj;
return *this;
}
}
{
auto* command = ProgramProto->AddCommand();
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumns) {
proj.AddColumns()->SetName(i);
}
*command->MutableProjection() = proj;
}
return *this;
}
TShardReader& SetReplyColumns(const std::vector<TString>& replyColumns);

TShardReader& SetReplyColumnIds(const std::vector<ui32>& replyColumnIds) {
AFL_VERIFY(!SerializedProgram);
if (!ProgramProto) {
ProgramProto = NKikimrSSA::TProgram();
}
for (auto&& command : *ProgramProto->MutableCommand()) {
if (command.HasProjection()) {
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumnIds) {
proj.AddColumns()->SetId(i);
}
*command.MutableProjection() = proj;
return *this;
}
}
{
auto* command = ProgramProto->AddCommand();
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumnIds) {
proj.AddColumns()->SetId(i);
}
*command->MutableProjection() = proj;
}
return *this;
}
TShardReader& SetReplyColumnIds(const std::vector<ui32>& replyColumnIds);

TShardReader& SetProgram(const NKikimrSSA::TProgram& p) {
AFL_VERIFY(!ProgramProto);
Expand Down
63 changes: 63 additions & 0 deletions ydb/core/tx/columnshard/test_helper/shard_writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#include "shard_writer.h"

#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/testlib/tablet_helpers.h>
#include <ydb/core/tx/columnshard/defs.h>
#include <ydb/core/tx/data_events/events.h>
#include <ydb/core/tx/data_events/payload_helper.h>

namespace NKikimr::NTxUT {

NKikimrDataEvents::TEvWriteResult::EStatus TShardWriter::StartCommit(const ui64 txId) {
auto evCommit = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
evCommit->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
auto* lock = evCommit->Record.MutableLocks()->AddLocks();
lock->SetLockId(LockId);
ForwardToTablet(Runtime, TTestTxConfig::TxTablet0, Sender, evCommit.release());

TAutoPtr<NActors::IEventHandle> handle;
auto event = Runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
AFL_VERIFY(event);

return event->Record.GetStatus();
}

NKikimrDataEvents::TEvWriteResult::EStatus TShardWriter::Abort(const ui64 txId) {
auto evCommit = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
evCommit->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Rollback);
auto* lock = evCommit->Record.MutableLocks()->AddLocks();
lock->SetLockId(LockId);
ForwardToTablet(Runtime, TTestTxConfig::TxTablet0, Sender, evCommit.release());

TAutoPtr<NActors::IEventHandle> handle;
auto event = Runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
AFL_VERIFY(event);

return event->Record.GetStatus();
}

NKikimrDataEvents::TEvWriteResult::EStatus TShardWriter::Write(
const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<ui32>& columnIds, const ui64 txId) {
TString blobData = NArrow::SerializeBatchNoCompression(batch);
// AFL_VERIFY(blobData.size() < NColumnShard::TLimits::GetMaxBlobSize());

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
evWrite->SetTxId(txId);
evWrite->SetLockId(LockId, LockNodeId);
const ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, { OwnerId, PathId, SchemaVersion }, columnIds,
payloadIndex, NKikimrDataEvents::FORMAT_ARROW);

ForwardToTablet(Runtime, TabletId, Sender, evWrite.release());

TAutoPtr<NActors::IEventHandle> handle;
auto event = Runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
AFL_VERIFY(event);

AFL_VERIFY(event->Record.GetOrigin() == PathId);
AFL_VERIFY(event->Record.GetTxId() == LockId);

return event->Record.GetStatus();
}

} // namespace NKikimr::NTxUT
Loading

0 comments on commit 53620be

Please sign in to comment.