Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov committed Feb 24, 2025
1 parent afbaa97 commit aaace33
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 65 deletions.
117 changes: 58 additions & 59 deletions ydb/core/tx/replication/service/table_writer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ namespace NKikimr::NReplication::NService {

Y_UNIT_TEST_SUITE(LocalTableWriter) {
using namespace NTestHelpers;
using TRecord = TEvWorker::TEvData::TRecord;

TRecord Record(ui64 offset, const TString& data) {
return TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 13 /* seqNo */);
TEvWorker::TEvData::TRecord TRecord(ui64 offset, const TString& data) {
return TEvWorker::TEvData::TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 13 /* seqNo */);
}

Y_UNIT_TEST(WriteTable) {
Expand All @@ -40,9 +39,9 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());

env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
Record(1, R"({"key":[1], "update":{"value":"10"}})"),
Record(2, R"({"key":[2], "update":{"value":"20"}})"),
Record(3, R"({"key":[3], "update":{"value":"30"}})"),
TRecord(1, R"({"key":[1], "update":{"value":"10"}})"),
TRecord(2, R"({"key":[2], "update":{"value":"20"}})"),
TRecord(3, R"({"key":[3], "update":{"value":"30"}})"),
}));
}

Expand Down Expand Up @@ -97,37 +96,37 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());

env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
Record(1, R"({"key":[1], "update":{"int32_value":-100500}})"),
Record(2, R"({"key":[2], "update":{"uint32_value":100500}})"),
Record(3, R"({"key":[3], "update":{"int64_value":-200500}})"),
Record(4, R"({"key":[4], "update":{"uint64_value":200500}})"),
Record(5, R"({"key":[5], "update":{"uint8_value":255}})"),
Record(6, R"({"key":[6], "update":{"bool_value":true}})"),
Record(7, R"({"key":[7], "update":{"double_value":1.1234}})"),
Record(8, R"({"key":[8], "update":{"float_value":-1.123}})"),
Record(9, R"({"key":[9], "update":{"date_value":"2020-08-12T00:00:00.000000Z"}})"),
Record(10, R"({"key":[10], "update":{"datetime_value":"2020-08-12T12:34:56.000000Z"}})"),
Record(11, R"({"key":[11], "update":{"timestamp_value":"2020-08-12T12:34:56.123456Z"}})"),
Record(12, R"({"key":[12], "update":{"interval_value":-300500}})"),
Record(13, R"({"key":[13], "update":{"decimal_value":"3.321"}})"),
Record(14, R"({"key":[14], "update":{"dynumber_value":".3321e1"}})"),
Record(15, Sprintf(R"({"key":[15], "update":{"string_value":"%s"}})", Base64Encode("lorem ipsum").c_str())),
Record(16, R"({"key":[16], "update":{"utf8_value":"lorem ipsum"}})"),
Record(17, R"({"key":[17], "update":{"json_value":{"key": "value"}}})"),
Record(18, R"({"key":[18], "update":{"jsondoc_value":{"key": "value"}}})"),
Record(19, R"({"key":[19], "update":{"uuid_value":"65df1ec1-a97d-47b2-ae56-3c023da6ee8c"}})"),
Record(20, R"({"key":[20], "update":{"date32_value":18486}})"),
Record(21, R"({"key":[21], "update":{"datetime64_value":1597235696}})"),
Record(22, R"({"key":[22], "update":{"timestamp64_value":1597235696123456}})"),
Record(23, R"({"key":[23], "update":{"interval64_value":-300500}})"),
Record(24, R"({"key":[24], "update":{"pgint2_value":"-42"}})"),
Record(25, R"({"key":[25], "update":{"pgint4_value":"-420"}})"),
Record(26, R"({"key":[26], "update":{"pgint8_value":"-4200"}})"),
Record(27, R"({"key":[27], "update":{"pgfloat4_value":"3.1415"}})"),
Record(28, R"({"key":[28], "update":{"pgfloat8_value":"2.718"}})"),
Record(29, R"({"key":[29], "update":{"pgbytea_value":"\\x6c6f72656d2022697073756d22"}})"),
Record(30, R"({"key":[30], "update":{"pgtext_value":"lorem \"ipsum\""}})"),
Record(31, R"({"key":[31], "update":{"decimal35_value":"355555555555555.321"}})"),
TRecord(1, R"({"key":[1], "update":{"int32_value":-100500}})"),
TRecord(2, R"({"key":[2], "update":{"uint32_value":100500}})"),
TRecord(3, R"({"key":[3], "update":{"int64_value":-200500}})"),
TRecord(4, R"({"key":[4], "update":{"uint64_value":200500}})"),
TRecord(5, R"({"key":[5], "update":{"uint8_value":255}})"),
TRecord(6, R"({"key":[6], "update":{"bool_value":true}})"),
TRecord(7, R"({"key":[7], "update":{"double_value":1.1234}})"),
TRecord(8, R"({"key":[8], "update":{"float_value":-1.123}})"),
TRecord(9, R"({"key":[9], "update":{"date_value":"2020-08-12T00:00:00.000000Z"}})"),
TRecord(10, R"({"key":[10], "update":{"datetime_value":"2020-08-12T12:34:56.000000Z"}})"),
TRecord(11, R"({"key":[11], "update":{"timestamp_value":"2020-08-12T12:34:56.123456Z"}})"),
TRecord(12, R"({"key":[12], "update":{"interval_value":-300500}})"),
TRecord(13, R"({"key":[13], "update":{"decimal_value":"3.321"}})"),
TRecord(14, R"({"key":[14], "update":{"dynumber_value":".3321e1"}})"),
TRecord(15, Sprintf(R"({"key":[15], "update":{"string_value":"%s"}})", Base64Encode("lorem ipsum").c_str())),
TRecord(16, R"({"key":[16], "update":{"utf8_value":"lorem ipsum"}})"),
TRecord(17, R"({"key":[17], "update":{"json_value":{"key": "value"}}})"),
TRecord(18, R"({"key":[18], "update":{"jsondoc_value":{"key": "value"}}})"),
TRecord(19, R"({"key":[19], "update":{"uuid_value":"65df1ec1-a97d-47b2-ae56-3c023da6ee8c"}})"),
TRecord(20, R"({"key":[20], "update":{"date32_value":18486}})"),
TRecord(21, R"({"key":[21], "update":{"datetime64_value":1597235696}})"),
TRecord(22, R"({"key":[22], "update":{"timestamp64_value":1597235696123456}})"),
TRecord(23, R"({"key":[23], "update":{"interval64_value":-300500}})"),
TRecord(24, R"({"key":[24], "update":{"pgint2_value":"-42"}})"),
TRecord(25, R"({"key":[25], "update":{"pgint4_value":"-420"}})"),
TRecord(26, R"({"key":[26], "update":{"pgint8_value":"-4200"}})"),
TRecord(27, R"({"key":[27], "update":{"pgfloat4_value":"3.1415"}})"),
TRecord(28, R"({"key":[28], "update":{"pgfloat8_value":"2.718"}})"),
TRecord(29, R"({"key":[29], "update":{"pgbytea_value":"\\x6c6f72656d2022697073756d22"}})"),
TRecord(30, R"({"key":[30], "update":{"pgtext_value":"lorem \"ipsum\""}})"),
TRecord(31, R"({"key":[31], "update":{"decimal35_value":"355555555555555.321"}})"),
}));
}

Expand All @@ -148,9 +147,9 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());

env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
Record(1, R"({"key":["1.0"], "update":{"value":"155555555555555.321"}})"),
Record(2, R"({"key":["2.0"], "update":{"value":"255555555555555.321"}})"),
Record(3, R"({"key":["3.0"], "update":{"value":"355555555555555.321"}})"),
TRecord(1, R"({"key":["1.0"], "update":{"value":"155555555555555.321"}})"),
TRecord(2, R"({"key":["2.0"], "update":{"value":"255555555555555.321"}})"),
TRecord(3, R"({"key":["3.0"], "update":{"value":"355555555555555.321"}})"),
}));
}

Expand Down Expand Up @@ -189,9 +188,9 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {

{
auto ev = env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0,"TestSource", {
Record(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
Record(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"),
Record(order++, R"({"key":[3], "update":{"value":"30"}, "ts":[3,0]})"),
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"),
TRecord(order++, R"({"key":[3], "update":{"value":"30"}, "ts":[3,0]})"),
}));

const auto& versions = ev->Get()->Record.GetVersions();
Expand All @@ -208,17 +207,17 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
}
{
auto ev = env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData(0, "TestSource", {
Record(order++, R"({"resolved":[10,0]})"),
TRecord(order++, R"({"resolved":[10,0]})"),
}));
UNIT_ASSERT_VALUES_EQUAL(TRowVersion::FromProto(ev->Get()->Record.GetVersion()), TRowVersion(10, 0));
env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender());
}

env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", {
Record(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[11,0]})"),
Record(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[12,0]})"),
Record(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[21,0]})"),
Record(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[22,0]})"),
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[11,0]})"),
TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[12,0]})"),
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[21,0]})"),
TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[22,0]})"),
}));

env.Send<TEvWorker::TEvPoll>(writer, MakeTxIdResult({
Expand All @@ -227,12 +226,12 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
}));

env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
Record(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[13,0]})"),
Record(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[23,0]})"),
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[13,0]})"),
TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[23,0]})"),
}));

env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData(0, "TestSource", {
Record(order++, R"({"resolved":[30,0]})"),
TRecord(order++, R"({"resolved":[30,0]})"),
}));
env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender());
}
Expand Down Expand Up @@ -299,8 +298,8 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {

env.Send<TEvWorker::TEvHandshake>(worker, new TEvWorker::TEvHandshake());
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", {
Record(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
Record(2, R"({"key":[2], "update":{"value":"20"}, "ts":[11,0]})"),
TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
TRecord(2, R"({"key":[2], "update":{"value":"20"}, "ts":[11,0]})"),
}));
env.SendAsync(writer, MakeTxIdResult({
{TRowVersion(10, 0), 1},
Expand Down Expand Up @@ -382,8 +381,8 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {

env.Send<TEvWorker::TEvHandshake>(worker, new TEvWorker::TEvHandshake());
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", {
Record(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
Record(2, R"({"resolved":[10,0]})"),
TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
TRecord(2, R"({"resolved":[10,0]})"),
}));
env.Send<TEvService::TEvHeartbeat>(writer, MakeTxIdResult({
{TRowVersion(10, 0), 1},
Expand Down Expand Up @@ -412,16 +411,16 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());

env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", {
Record(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
}));
env.Send<TEvWorker::TEvPoll>(writer, MakeTxIdResult({
{TRowVersion(10, 0), 1},
}));

env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", {
Record(2, R"({"key":[3], "update":{"value":"30"}, "ts":[11,0]})"),
Record(3, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"),
Record(4, R"({"resolved":[20,0]})"),
TRecord(2, R"({"key":[3], "update":{"value":"30"}, "ts":[11,0]})"),
TRecord(3, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"),
TRecord(4, R"({"resolved":[20,0]})"),
}));
env.Send<TEvService::TEvHeartbeat>(writer, MakeTxIdResult({
{TRowVersion(20, 0), 2},
Expand Down
11 changes: 5 additions & 6 deletions ydb/core/tx/replication/service/transfer_writer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ namespace NKikimr::NReplication::NService {

Y_UNIT_TEST_SUITE(TransferWriter) {
using namespace NTestHelpers;
using TRecord = TEvWorker::TEvData::TRecord;

TRecord Record(ui64 offset, const TString& data) {
return TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 13 /* seqNo */);
TEvWorker::TEvData::TRecord TRecord(ui64 offset, const TString& data) {
return TEvWorker::TEvData::TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 13 /* seqNo */);
}

Y_UNIT_TEST(Write_ColumnTable) {
Expand Down Expand Up @@ -56,9 +55,9 @@ Y_UNIT_TEST_SUITE(TransferWriter) {
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());

env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
Record(1, R"({"key":[1], "update":{"value":"10"}})"),
Record(2, R"({"key":[2], "update":{"value":"20"}})"),
Record(3, R"({"key":[3], "update":{"value":"30"}})"),
TRecord(1, R"({"key":[1], "update":{"value":"10"}})"),
TRecord(2, R"({"key":[2], "update":{"value":"20"}})"),
TRecord(3, R"({"key":[3], "update":{"value":"30"}})"),
}));
}

Expand Down

0 comments on commit aaace33

Please sign in to comment.