Skip to content

Commit

Permalink
schemeboard: pass describe-result as an opaque payload
Browse files Browse the repository at this point in the history
Change type of `{TEvUpdate,TEvNotify}.DescribeSchemeResult` from transparent
`TEvDescribeSchemeResult` to opaque `bytes` and support that throughout
Populator, Replica, Subscriber actors.

Properly typed TEvDescribeSchemeResult induce additional overhead to
automatically serialize and deserialize this message when transfering over
the wire.
This performance cost is usually either negligible or imperceptible.
But in specific situations, particularly when rapidly updating partitioning
information for tables with huge number of shards, this overhead could lead
to significant issues. Schemeboard replicas could get overloaded and become
unresponsive to further requests. This is problematic, especially considering
the schemeboard subsystem's critical role in servicing all databases within
a cluster, making it a SPOF.

The core realization is that the schemeboard components do not require
the full content of a TEvDescribeSchemeResult message to operate efficiently.
Instead, only a limited set of fields (path, path-id, version and info about
subdomain/database) is required for processing.
And a whole TEvDescribeSchemeResult could be passed through as an opaque payload.

Type change from TEvDescribeSchemeResult to bytes without changing field number
is a safe move. Actual value of the field remains unchanged at the wire
protocol level.
Thus, older implementations will interpret the payload as
a TEvDescribeSchemeResult message and proceed with deserialization as usual.
And newer implementations will recognize the data as a binary blob and will
deserialize it explicitly only when necessary.

KIKIMR-14948
  • Loading branch information
ijon committed Feb 20, 2024
1 parent 5ba8840 commit ba5f107
Show file tree
Hide file tree
Showing 20 changed files with 579 additions and 307 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/flat_tx_scheme.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ message TEvDescribeSchemeResult {
optional string LastExistedPrefixPath = 7;
optional fixed64 LastExistedPrefixPathId = 8;
optional NKikimrSchemeOp.TPathDescription LastExistedPrefixDescription = 9;

optional fixed64 PathOwnerId = 10;
}

Expand Down
74 changes: 63 additions & 11 deletions ydb/core/protos/scheme_board.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import "ydb/core/protos/flat_tx_scheme.proto";
import "ydb/core/scheme/protos/pathid.proto";

package NKikimrSchemeBoard;
option java_package = "ru.yandex.kikimr.proto";
Expand All @@ -13,22 +13,68 @@ message TEvHandshake {
optional uint64 Generation = 2;
}

// here and below
// Owner is the tablet id of schemeshard witch holds the records
// LocalPathId is a second part of TPathId
// PathOwnerId is a first part of TPathId
// Here and below.
// Owner is the tablet id of schemeshard which holds the records.
// (PathOwnerId, LocalPathId) constitute TPathId of the object.

// TEvUpdate.DescribeSchemeResultSerialized is a NKikimrScheme.TEvDescribeSchemeResult
// in the form of opaque payload.
// Originally, that field existed as a properly typed TEvDescribeSchemeResult message.
// However, that induce additional overhead to serialize and deserialize this message
// when transfering over wire.
// This performance cost is usually either negligible or imperceptible.
// But in specific situations, particularly when rapidly updating partitioning information
// for tables with huge number of shards, this overhead could lead to significant issues.
// Schemeboard replicas could get overloaded and become unresponsive to further requests.
// This is problematic, especially considering the schemeboard subsystem's critical role
// in servicing all databases within a cluster, making it a Single Point of Failure (SPOF).
//
// The core realization is that the schemeboard components do not require the full content of
// a TEvDescribeSchemeResult message to operate efficiently. Instead, only a limited set of
// fields (path, path-id, version and info about subdomain/database) is required for processing.
// And a whole TEvDescribeSchemeResult could be passed through as an opaque payload.
//
// Type change from TEvDescribeSchemeResult to bytes without changing field number
// is a safe move. Actual value of the field remains unchanged at the wire protocol level.
// Thus, older implementations will interpret the payload as a TEvDescribeSchemeResult message
// and proceed with deserialization as usual. And newer implementations will recognize the data
// as a binary blob and will deserialize it explicitly only when necessary.
//
// - Path
// - PathOwnerId, LocalPathId
// - PathDirEntryPathVersion
// - PathSubdomainPathId
// - PathAbandonedTenantsSchemeShards
// are taken from the original TEvDescribeSchemeResult (one way or another).
//
message TEvUpdate {
optional uint64 Owner = 1;
optional uint64 Generation = 2;
optional TLocalPathIdRange DeletedLocalPathIds = 3;
optional string Path = 4;
optional uint64 LocalPathId = 5;

optional string Path = 4; // extracted from DescribeSchemeResult.Path
optional uint64 LocalPathId = 5; // extracted from DescribeSchemeResult.PathId

optional bool IsDeletion = 6 [default = false];
optional NKikimrScheme.TEvDescribeSchemeResult DescribeSchemeResult = 7;

optional bytes DescribeSchemeResultSerialized = 7;

optional bool NeedAck = 8 [default = false];
optional uint64 PathOwnerId = 9;

optional uint64 PathOwnerId = 9; // extracted from DescribeSchemeResult.PathOwnerId, DescribeSchemeResult.PathDescription.Self.SchemeshardId in order of presence

optional TLocalPathIdRange MigratedLocalPathIds = 10;

// Explicit values extracted from DescribeSchemeResultSerialized

// DescribeSchemeResult.PathDescription.Self.PathVersion
optional uint64 PathDirEntryPathVersion = 11;

// DescribeSchemeResult.PathDescription.DomainDescription.DomainKey
optional NKikimrProto.TPathID PathSubdomainPathId = 13;

// DescribeSchemeResult.PathDescription.AbandonedTenantsSchemeShards
repeated uint64 PathAbandonedTenantsSchemeShards = 14;
}

message TEvUpdateAck {
Expand Down Expand Up @@ -65,16 +111,22 @@ message TEvUnsubscribe {
optional uint64 LocalPathId = 3;
}

// See comments for TEvUpdate.
message TEvNotify {
optional string Path = 1;
// and/or
optional uint64 PathOwnerId = 2;
optional uint64 LocalPathId = 3;
// common fields
optional bool IsDeletion = 4 [default = false];
optional NKikimrScheme.TEvDescribeSchemeResult DescribeSchemeResult = 5;
optional uint64 Version = 6;

optional bytes DescribeSchemeResultSerialized = 5;

optional uint64 Version = 6; // same as TEvUpdate.PathDirEntryPathVersion
optional bool Strong = 7 [default = false];

optional NKikimrProto.TPathID PathSubdomainPathId = 8;
repeated uint64 PathAbandonedTenantsSchemeShards = 9;
}

message TEvNotifyAck {
Expand Down
43 changes: 22 additions & 21 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
static THolder<TDataStreamsClient> MakeClient(const NYdb::TDriver& driver, const TString& database) {
return MakeHolder<TDataStreamsClient>(driver, NYdb::TCommonClientSettings().Database(database));
}
};
};

class TTestTopicEnv: public TTestEnv<TTestTopicEnv, NYdb::NTopic::TTopicClient> {
public:
Expand All @@ -798,7 +798,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
static THolder<NYdb::NTopic::TTopicClient> MakeClient(const NYdb::TDriver& driver, const TString& database) {
return MakeHolder<NYdb::NTopic::TTopicClient>(driver, NYdb::NTopic::TTopicClientSettings().Database(database));
}
};
};

TShardedTableOptions SimpleTable() {
return TShardedTableOptions()
Expand Down Expand Up @@ -1344,7 +1344,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 30);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
R"({"update":{},"key":[1]})",
R"({"update":{},"key":[2]})",
R"({"update":{},"key":[3]})",
Expand All @@ -1360,7 +1360,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 30);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":2}})"}}},
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":3}})"}}},
Expand All @@ -1376,7 +1376,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 30);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
R"({"update":{"value":10},"key":[1]})",
R"({"update":{"value":20},"key":[2]})",
R"({"update":{"value":30},"key":[3]})",
Expand All @@ -1397,7 +1397,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 300);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
R"({"update":{},"newImage":{"value":10},"key":[1]})",
R"({"update":{},"newImage":{"value":20},"key":[2]})",
R"({"update":{},"newImage":{"value":30},"key":[3]})",
Expand All @@ -1421,7 +1421,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 300);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
{DebeziumBody("c", nullptr, R"({"key":1,"value":10})"), {{"__key", R"({"payload":{"key":1}})"}}},
{DebeziumBody("c", nullptr, R"({"key":2,"value":20})"), {{"__key", R"({"payload":{"key":2}})"}}},
{DebeziumBody("c", nullptr, R"({"key":3,"value":30})"), {{"__key", R"({"payload":{"key":3}})"}}},
Expand All @@ -1445,7 +1445,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 300);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":2}})"}}},
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":3}})"}}},
Expand All @@ -1456,7 +1456,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}

Y_UNIT_TEST(NewImageLogDebezium) {
Y_UNIT_TEST(NewImageLogDebezium) {
TopicRunner::Read(SimpleTable(), NewImage(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
(1, 10),
Expand All @@ -1469,7 +1469,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(3, 300);
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
)"}, {
{DebeziumBody("u", nullptr, R"({"key":1,"value":10})"), {{"__key", R"({"payload":{"key":1}})"}}},
{DebeziumBody("u", nullptr, R"({"key":2,"value":20})"), {{"__key", R"({"payload":{"key":2}})"}}},
{DebeziumBody("u", nullptr, R"({"key":3,"value":30})"), {{"__key", R"({"payload":{"key":3}})"}}},
Expand All @@ -1486,7 +1486,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(1, 10),
(2, 20),
(3, 30);
)"}, {
)"}, {
R"({"update":{},"key":[1],"ts":"***"})",
R"({"update":{},"key":[2],"ts":"***"})",
R"({"update":{},"key":[3],"ts":"***"})",
Expand All @@ -1512,7 +1512,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
UPSERT INTO `/Root/Table` (__Hash, id_shard, id_sort, __RowData) VALUES (
1, "10", "100", JsonDocument('{"M":{"color":{"S":"pink"},"weight":{"N":"4.5"}}}')
);
)"}, {
)"}, {
WriteJson(NJson::TJsonMap({
{"awsRegion", ""},
{"dynamodb", NJson::TJsonMap({
Expand Down Expand Up @@ -1541,7 +1541,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
);
)", R"(
DELETE FROM `/Root/Table` WHERE __Hash = 1;
)"}, {
)"}, {
WriteJson(NJson::TJsonMap({
{"awsRegion", ""},
{"dynamodb", NJson::TJsonMap({
Expand Down Expand Up @@ -1639,7 +1639,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
(1, 0.0%s/0.0%s),
(2, 1.0%s/0.0%s),
(3, -1.0%s/0.0%s);
)", s, s, s, s, s, s)}, {
)", s, s, s, s, s, s)}, {
R"({"update":{"value":"nan"},"key":[1]})",
R"({"update":{"value":"inf"},"key":[2]})",
R"({"update":{"value":"-inf"},"key":[3]})",
Expand Down Expand Up @@ -1674,7 +1674,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
TopicRunner::Read(table, KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {Sprintf(R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
("%s", 1);
)", key.c_str())}, {
)", key.c_str())}, {
{DebeziumBody("u", nullptr, nullptr), {{"__key", Sprintf(R"({"payload":{"key":"%s"}})", key.c_str())}}},
});
}
Expand Down Expand Up @@ -2043,7 +2043,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
UPSERT INTO `/Root/TableAux` (key, value)
VALUES (1, 10);
)");
)");

SetSplitMergePartCountLimit(&runtime, -1);
const auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
Expand Down Expand Up @@ -2292,7 +2292,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);

WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 4));

// execute on old partitions
Expand Down Expand Up @@ -2376,7 +2376,8 @@ Y_UNIT_TEST_SUITE(Cdc) {

case TSchemeBoardEvents::EvUpdate:
if (auto* msg = ev->Get<TSchemeBoardEvents::TEvUpdate>()) {
const auto desc = msg->GetRecord().GetDescribeSchemeResult();
NKikimrScheme::TEvDescribeSchemeResult desc;
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(desc, msg->GetRecord().GetDescribeSchemeResultSerialized()));
if (desc.GetPath() == "/Root/Table/Stream" && desc.GetPathDescription().GetSelf().GetCreateFinished()) {
delayed.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
Expand Down Expand Up @@ -2446,7 +2447,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
UPSERT INTO `/Root/Table` (key, value)
VALUES (1, 10);
)");
)");

SetSplitMergePartCountLimit(&runtime, -1);
const auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
Expand Down Expand Up @@ -3266,7 +3267,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);

WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 4));

// merge
Expand Down Expand Up @@ -3298,7 +3299,7 @@ template <>
void Out<std::pair<TString, TString>>(IOutputStream& output, const std::pair<TString, TString>& x) {
output << x.first << ":" << x.second;
}

void AppendToString(TString& dst, const std::pair<TString, TString>& x) {
TStringOutput output(dst);
output << x;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/scheme_board/cache_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class TCacheTest: public TTestWithSchemeshard {
" Kind: \"pool-kind-1\" "
"} "
" Name: \"Root\" ");

// Context->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NLog::PRI_DEBUG);
// Context->SetLogPriority(NKikimrServices::SCHEME_BOARD_SUBSCRIBER, NLog::PRI_DEBUG);
// Context->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG);
// Context->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NLog::PRI_DEBUG);
}

UNIT_TEST_SUITE(TCacheTest);
Expand Down
Loading

0 comments on commit ba5f107

Please sign in to comment.