Skip to content

Commit

Permalink
Set & check replication config (ydb-platform#2649)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jun 6, 2024
1 parent 537ee67 commit 2f21c3d
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 3 deletions.
8 changes: 8 additions & 0 deletions ydb/core/protos/out/out.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ Y_DECLARE_OUT_SPEC(, NKikimrSchemeOp::ECdcStreamState, stream, value) {
stream << NKikimrSchemeOp::ECdcStreamState_Name(value);
}

Y_DECLARE_OUT_SPEC(, NKikimrSchemeOp::TTableReplicationConfig::EReplicationMode, stream, value) {
stream << NKikimrSchemeOp::TTableReplicationConfig::EReplicationMode_Name(value);
}

Y_DECLARE_OUT_SPEC(, NKikimrSchemeOp::TTableReplicationConfig::EConsistency, stream, value) {
stream << NKikimrSchemeOp::TTableReplicationConfig::EConsistency_Name(value);
}

Y_DECLARE_OUT_SPEC(, NKikimrSubDomains::EServerlessComputeResourcesMode, stream, value) {
stream << NKikimrSubDomains::EServerlessComputeResourcesMode_Name(value);
}
Expand Down
31 changes: 30 additions & 1 deletion ydb/core/tx/replication/controller/dst_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {

// TODO: support indexed tables
TxBody.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
TxBody.MutableCreateTable()->SetName(ToString(ExtractBase(DstPath)));
auto& desc = *TxBody.MutableCreateTable();
desc.SetName(ToString(ExtractBase(DstPath)));
// TODO: support other modes
auto& replicationConfig = *desc.MutableReplicationConfig();
replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
replicationConfig.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);

AllocateTxId();
}
Expand Down Expand Up @@ -249,6 +254,30 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
}

bool CheckTableScheme(const NKikimrSchemeOp::TTableDescription& got, TString& error) const {
if (!got.HasReplicationConfig()) {
error = "Empty replication config";
return false;
}

const auto& replicationConfig = got.GetReplicationConfig();

switch (replicationConfig.GetMode()) {
case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY:
break;
default:
error = "Unsupported replication mode";
return false;
}

switch (replicationConfig.GetConsistency()) {
case NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK:
break;
default:
error = TStringBuilder() << "Unsupported replication consistency"
<< ": " << static_cast<int>(replicationConfig.GetConsistency());
return false;
}

const auto& expected = TxBody.GetCreateTable();

// check key
Expand Down
55 changes: 55 additions & 0 deletions ydb/core/tx/replication/controller/dst_creator_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ Y_UNIT_TEST_SUITE(DstCreator) {

UNIT_ASSERT(FindIfPtr(tableDesc.Columns, pred));
}

const auto& replCfg = replicatedDesc.GetReplicationConfig();
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetMode(), NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
UNIT_ASSERT_VALUES_EQUAL(replCfg.GetConsistency(), NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
}

Y_UNIT_TEST(NonExistentSrc) {
Expand Down Expand Up @@ -191,6 +195,57 @@ Y_UNIT_TEST_SUITE(DstCreator) {
},
});
}

Y_UNIT_TEST(EmptyReplicationConfig) {
auto clearConfig = [](const TTestTableDescription& desc) {
auto copy = desc;
copy.ReplicationConfig.Clear();
return copy;
};

ExistingDst(NKikimrScheme::StatusSchemeError, "Empty replication config", clearConfig, TTestTableDescription{
.Name = "Table",
.KeyColumns = {"key"},
.Columns = {
{.Name = "key", .Type = "Uint32"},
{.Name = "value", .Type = "Utf8"},
},
});
}

Y_UNIT_TEST(UnsupportedReplicationMode) {
auto clearMode = [](const TTestTableDescription& desc) {
auto copy = desc;
copy.ReplicationConfig->Mode = TTestTableDescription::TReplicationConfig::MODE_NONE;
return copy;
};

ExistingDst(NKikimrScheme::StatusSchemeError, "Unsupported replication mode", clearMode, TTestTableDescription{
.Name = "Table",
.KeyColumns = {"key"},
.Columns = {
{.Name = "key", .Type = "Uint32"},
{.Name = "value", .Type = "Utf8"},
},
});
}

Y_UNIT_TEST(UnsupportedReplicationConsistency) {
auto changeConsistency = [](const TTestTableDescription& desc) {
auto copy = desc;
copy.ReplicationConfig->Consistency = TTestTableDescription::TReplicationConfig::CONSISTENCY_STRONG;
return copy;
};

ExistingDst(NKikimrScheme::StatusSchemeError, "Unsupported replication consistency", changeConsistency, TTestTableDescription{
.Name = "Table",
.KeyColumns = {"key"},
.Columns = {
{.Name = "key", .Type = "Uint32"},
{.Name = "value", .Type = "Utf8"},
},
});
}
}

}
37 changes: 35 additions & 2 deletions ydb/core/tx/replication/ut_helpers/test_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,39 @@ void TTestTableDescription::TColumn::SerializeTo(NKikimrSchemeOp::TColumnDescrip
proto.SetType(Type);
}

void TTestTableDescription::TReplicationConfig::SerializeTo(NKikimrSchemeOp::TTableReplicationConfig& proto) const {
switch (Mode) {
case MODE_NONE:
proto.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE);
break;
case MODE_READ_ONLY:
proto.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
break;
default:
Y_ABORT("Unexpected mode");
}

switch (Consistency) {
case CONSISTENCY_STRONG:
proto.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_STRONG);
break;
case CONSISTENCY_WEAK:
proto.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
break;
default:
Y_ABORT("Unexpected consistency");
}
}

TTestTableDescription::TReplicationConfig TTestTableDescription::TReplicationConfig::Default() {
return TReplicationConfig{
.Mode = MODE_READ_ONLY,
.Consistency = CONSISTENCY_WEAK,
};
}

void TTestTableDescription::SerializeTo(NKikimrSchemeOp::TTableDescription& proto) const {
proto.SetName(Name);
proto.MutableReplicationConfig()->SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
proto.MutableReplicationConfig()->SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);

for (const auto& keyColumn : KeyColumns) {
proto.AddKeyColumnNames(keyColumn);
Expand All @@ -21,6 +50,10 @@ void TTestTableDescription::SerializeTo(NKikimrSchemeOp::TTableDescription& prot
for (const auto& column : Columns) {
column.SerializeTo(*proto.AddColumns());
}

if (ReplicationConfig) {
ReplicationConfig->SerializeTo(*proto.MutableReplicationConfig());
}
}

THolder<NKikimrSchemeOp::TTableDescription> MakeTableDescription(const TTestTableDescription& desc) {
Expand Down
21 changes: 21 additions & 0 deletions ydb/core/tx/replication/ut_helpers/test_table.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#pragma once

#include <util/generic/maybe.h>
#include <util/generic/ptr.h>
#include <util/generic/string.h>
#include <util/generic/vector.h>

namespace NKikimrSchemeOp {
class TColumnDescription;
class TTableDescription;
class TTableReplicationConfig;
}

namespace NKikimr::NReplication::NTestHelpers {
Expand All @@ -19,9 +21,28 @@ struct TTestTableDescription {
void SerializeTo(NKikimrSchemeOp::TColumnDescription& proto) const;
};

struct TReplicationConfig {
enum EMode {
MODE_NONE = 0,
MODE_READ_ONLY = 1,
};

enum EConsistency {
CONSISTENCY_STRONG = 1,
CONSISTENCY_WEAK = 2,
};

EMode Mode;
EConsistency Consistency;

void SerializeTo(NKikimrSchemeOp::TTableReplicationConfig& proto) const;
static TReplicationConfig Default();
};

TString Name;
TVector<TString> KeyColumns;
TVector<TColumn> Columns;
TMaybe<TReplicationConfig> ReplicationConfig = TReplicationConfig::Default();

void SerializeTo(NKikimrSchemeOp::TTableDescription& proto) const;
};
Expand Down

0 comments on commit 2f21c3d

Please sign in to comment.