Skip to content

Commit

Permalink
Allow streams on sync-index table (ydb-platform#6842)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jul 26, 2024
1 parent 42b6cd6 commit 21f916f
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -800,8 +800,12 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat
.NotUnderDeleting()
.NotUnderOperation();

if (checks && !tablePath.IsInsideTableIndexPath()) {
checks.IsCommonSensePath();
if (checks) {
if (!tablePath.IsInsideTableIndexPath()) {
checks.IsCommonSensePath();
} else if (!tablePath.Parent().IsTableIndex(NKikimrSchemeOp::EIndexTypeGlobal)) {
return CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, "Cannot add changefeed to index table");
}
}

if (!checks) {
Expand Down
13 changes: 11 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1599,10 +1599,19 @@ bool TPath::IsInsideCdcStreamPath() const {
return true;
}

bool TPath::IsTableIndex() const {
bool TPath::IsTableIndex(const TMaybe<NKikimrSchemeOp::EIndexType>& type) const {
Y_ABORT_UNLESS(IsResolved());

return Base()->IsTableIndex();
if (!Base()->IsTableIndex()) {
return false;
}

if (!type.Defined()) {
return true;
}

Y_ABORT_UNLESS(SS->Indexes.contains(Base()->PathId));
return SS->Indexes.at(Base()->PathId)->Type == *type;
}

bool TPath::IsBackupTable() const {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_path.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include <ydb/core/protos/flat_tx_scheme.pb.h>

#include <util/generic/maybe.h>

namespace NKikimr::NSchemeShard {

class TSchemeShard;
Expand Down Expand Up @@ -159,7 +161,7 @@ class TPath {
bool AtLocalSchemeShardPath() const;
bool IsInsideTableIndexPath() const;
bool IsInsideCdcStreamPath() const;
bool IsTableIndex() const;
bool IsTableIndex(const TMaybe<NKikimrSchemeOp::EIndexType>& type = {}) const;
bool IsBackupTable() const;
bool IsAsyncReplicaTable() const;
bool IsCdcStream() const;
Expand Down
32 changes: 23 additions & 9 deletions ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1213,9 +1213,14 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
KeyColumnNames: ["key"]
}
IndexDescription {
Name: "Index"
Name: "SyncIndex"
KeyColumnNames: ["indexed"]
}
IndexDescription {
Name: "AsyncIndex"
KeyColumnNames: ["indexed"]
Type: EIndexTypeGlobalAsync
}
)");
env.TestWaitNotification(runtime, txId);

Expand All @@ -1228,7 +1233,16 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
}
)", {NKikimrScheme::StatusPathDoesNotExist});

TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"(
TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/AsyncIndex", R"(
TableName: "indexImplTable"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
}
)", {NKikimrScheme::StatusPreconditionFailed});

TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/SyncIndex", R"(
TableName: "indexImplTable"
StreamDescription {
Name: "Stream"
Expand All @@ -1238,10 +1252,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
)");
env.TestWaitNotification(runtime, txId);

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), {
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {
NLs::PathExist,
});
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), {
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), {
NLs::PathExist,
});

Expand All @@ -1251,14 +1265,14 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
Disable {}
)", {NKikimrScheme::StatusPathDoesNotExist});

TestAlterCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"(
TestAlterCdcStream(runtime, ++txId, "/MyRoot/Table/SyncIndex", R"(
TableName: "indexImplTable"
StreamName: "Stream"
Disable {}
)");
env.TestWaitNotification(runtime, txId);

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), {
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {
NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateDisabled),
});

Expand All @@ -1267,16 +1281,16 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
StreamName: "Stream"
)", {NKikimrScheme::StatusPathDoesNotExist});

TestDropCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"(
TestDropCdcStream(runtime, ++txId, "/MyRoot/Table/SyncIndex", R"(
TableName: "indexImplTable"
StreamName: "Stream"
)");
env.TestWaitNotification(runtime, txId);

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), {
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), {
NLs::PathNotExist,
});
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), {
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), {
NLs::PathNotExist,
});
}
Expand Down

0 comments on commit 21f916f

Please sign in to comment.