diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index f77afe446e6e..426e6faf03dd 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -686,8 +686,12 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran .NotUnderDeleting() .NotUnderOperation(); - if (checks && !tablePath.IsInsideTableIndexPath()) { - checks.IsCommonSensePath(); + if (checks) { + if (!tablePath.IsInsideTableIndexPath()) { + checks.IsCommonSensePath(); + } else if (!tablePath.Parent().IsTableIndex(NKikimrSchemeOp::EIndexTypeGlobal)) { + return CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, "Cannot add changefeed to index table"); + } } if (!checks) { diff --git a/ydb/core/tx/schemeshard/schemeshard_path.cpp b/ydb/core/tx/schemeshard/schemeshard_path.cpp index 24dffd138e53..0d6a519a52e4 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path.cpp @@ -1567,10 +1567,19 @@ bool TPath::IsInsideCdcStreamPath() const { return true; } -bool TPath::IsTableIndex() const { +bool TPath::IsTableIndex(const TMaybe& type) const { Y_ABORT_UNLESS(IsResolved()); - return Base()->IsTableIndex(); + if (!Base()->IsTableIndex()) { + return false; + } + + if (!type.Defined()) { + return true; + } + + Y_ABORT_UNLESS(SS->Indexes.contains(Base()->PathId)); + return SS->Indexes.at(Base()->PathId)->Type == *type; } bool TPath::IsBackupTable() const { diff --git a/ydb/core/tx/schemeshard/schemeshard_path.h b/ydb/core/tx/schemeshard/schemeshard_path.h index cc6e9cffd7b0..0b105bab607a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.h +++ b/ydb/core/tx/schemeshard/schemeshard_path.h @@ -5,6 +5,8 @@ #include +#include + namespace NKikimr::NSchemeShard { class TSchemeShard; @@ -157,7 +159,7 @@ class TPath { bool AtLocalSchemeShardPath() const; bool IsInsideTableIndexPath() const; bool IsInsideCdcStreamPath() const; - bool IsTableIndex() const; + bool IsTableIndex(const TMaybe& type = {}) const; bool IsBackupTable() const; bool IsAsyncReplicaTable() const; bool IsCdcStream() const; diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp index e7bac0b143de..672c4233e1da 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp @@ -1213,9 +1213,14 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { KeyColumnNames: ["key"] } IndexDescription { - Name: "Index" + Name: "SyncIndex" KeyColumnNames: ["indexed"] } + IndexDescription { + Name: "AsyncIndex" + KeyColumnNames: ["indexed"] + Type: EIndexTypeGlobalAsync + } )"); env.TestWaitNotification(runtime, txId); @@ -1228,7 +1233,16 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { } )", {NKikimrScheme::StatusPathDoesNotExist}); - TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/AsyncIndex", R"( + TableName: "indexImplTable" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )", {NKikimrScheme::StatusPreconditionFailed}); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot/Table/SyncIndex", R"( TableName: "indexImplTable" StreamDescription { Name: "Stream" @@ -1238,10 +1252,10 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { )"); env.TestWaitNotification(runtime, txId); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), { + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), { NLs::PathExist, }); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), { + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), { NLs::PathExist, }); @@ -1251,14 +1265,14 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { Disable {} )", {NKikimrScheme::StatusPathDoesNotExist}); - TestAlterCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( + TestAlterCdcStream(runtime, ++txId, "/MyRoot/Table/SyncIndex", R"( TableName: "indexImplTable" StreamName: "Stream" Disable {} )"); env.TestWaitNotification(runtime, txId); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), { + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), { NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateDisabled), }); @@ -1267,16 +1281,16 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { StreamName: "Stream" )", {NKikimrScheme::StatusPathDoesNotExist}); - TestDropCdcStream(runtime, ++txId, "/MyRoot/Table/Index", R"( + TestDropCdcStream(runtime, ++txId, "/MyRoot/Table/SyncIndex", R"( TableName: "indexImplTable" StreamName: "Stream" )"); env.TestWaitNotification(runtime, txId); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream"), { + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), { NLs::PathNotExist, }); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/Stream/streamImpl"), { + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream/streamImpl"), { NLs::PathNotExist, }); }