Skip to content

Commit

Permalink
Add end-to-end async replication test. (#6522)
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik authored Jul 10, 2024
1 parent a2e5443 commit b1baa2e
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 0 deletions.
116 changes: 116 additions & 0 deletions ydb/tests/functional/replication/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#include <util/system/env.h>
#include <library/cpp/testing/unittest/registar.h>

#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
#include <library/cpp/yson/writer.h>

#include <library/cpp/threading/local_executor/local_executor.h>

using namespace NYdb;
using namespace NYdb::NTable;

namespace {

TString ReformatYson(const TString& yson) {
TStringStream ysonInput(yson);
TStringStream output;
NYson::ReformatYsonStream(&ysonInput, &output, NYson::EYsonFormat::Text);
return output.Str();
}

void CompareYson(const TString& expected, const TString& actual) {
UNIT_ASSERT_NO_DIFF(ReformatYson(expected), ReformatYson(actual));
}

ui64 DoRead(TSession& s, const TString& table, ui64 expectedRows, const TString& expectedContent) {
auto res = s.ExecuteDataQuery(
Sprintf("SELECT * FROM `/local/%s`; SELECT COUNT(*) AS __count FROM `/local/%s`;",
table.data(), table.data()), TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
auto rs = NYdb::TResultSetParser(res.GetResultSet(1));
UNIT_ASSERT(rs.TryNextRow());
auto count = rs.ColumnParser("__count").GetUint64();

if (count == expectedRows) {
auto yson = NYdb::FormatResultSetYson(res.GetResultSet(0));

CompareYson(expectedContent, yson);
}

return count;
}

} // namespace

Y_UNIT_TEST_SUITE(Replication)
{
Y_UNIT_TEST(UuidValue)
{
TString connectionString = GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE");
Cerr << connectionString << Endl;
auto config = TDriverConfig(connectionString);
auto driver = TDriver(config);
auto tableClient = TTableClient(driver);
auto session = tableClient.GetSession().GetValueSync().GetSession();

{
auto res = session.ExecuteSchemeQuery(R"(
CREATE TABLE `/local/ProducerUuidValue` (
Key Uint32,
Value1 Uuid,
Value2 Uuid NOT NULL,
PRIMARY KEY (Key)
);
)").GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}

{
auto sessionResult = tableClient.GetSession().GetValueSync();
UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString());
auto s = sessionResult.GetSession();

{
const TString query = "UPSERT INTO ProducerUuidValue (Key, Value1, Value2) VALUES"
"(1, "
"CAST(\"5b99a330-04ef-4f1a-9b64-ba6d5f44ea01\" as Uuid), "
"UNWRAP(CAST(\"5b99a330-04ef-4f1a-9b64-ba6d5f44ea02\" as Uuid)"
"));";
auto res = s.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}

{
const TString query = Sprintf("CREATE ASYNC REPLICATION `replication` FOR"
"`ProducerUuidValue` AS `ConsumerUuidValue`"
"WITH ("
"CONNECTION_STRING = 'grpc://%s',"
"TOKEN = 'root@builtin'"
");", connectionString.data());
auto res = s.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
// TODO: Make CREATE ASYNC REPLICATION to be a sync call
Sleep(TDuration::Seconds(10));
}

NYdb::NTable::TExecDataQuerySettings execSettings;
execSettings.KeepInQueryCache(true);

auto sessionResult = tableClient.GetSession().GetValueSync();
UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString());

auto s = sessionResult.GetSession();
const TString expected = R"([[[1u];["5b99a330-04ef-4f1a-9b64-ba6d5f44ea01"];"5b99a330-04ef-4f1a-9b64-ba6d5f44ea02"]])";
ui32 attempt = 10;
while (1 != DoRead(s, "ConsumerUuidValue", 1, expected) && --attempt) {
Sleep(TDuration::Seconds(1));
}

UNIT_ASSERT_C(attempt, "Unable to wait replication result");
}
}

25 changes: 25 additions & 0 deletions ydb/tests/functional/replication/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
UNITTEST()

ENV(YDB_USE_IN_MEMORY_PDISKS=true)

ENV(YDB_ERASURE=block_4-2)

PEERDIR(
library/cpp/threading/local_executor
library/cpp/yson
ydb/public/sdk/cpp/client/ydb_table
ydb/public/sdk/cpp/client/draft
ydb/public/lib/yson_value
)

SRCS(
main.cpp
)

INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)

SIZE(MEDIUM)

REQUIREMENTS(ram:16)

END()
1 change: 1 addition & 0 deletions ydb/tests/functional/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ RECURSE(
query_cache
rename
restarts
replication
scheme_shard
scheme_tests
script_execution
Expand Down

0 comments on commit b1baa2e

Please sign in to comment.