diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp index f2995a5b441f..f969813d5b57 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp @@ -90,6 +90,10 @@ void TNodeWarden::RemoveDrivesWithBadSerialsAndReport(TVector TNodeWarden::ListLocalDrives() { + if (!AppData()->FeatureFlags.GetEnableDriveSerialsDiscovery()) { + return {}; + } + TStringStream details; TVector drives = ListDevicesWithPartlabel(details); diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp index 45f7f4250a02..b1b4e794e86e 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp @@ -174,6 +174,8 @@ namespace NKikimr { TEvResumeForce *ResumeForceToken = nullptr; TInstant ReplicationEndTime; bool UnrecoveredNonphantomBlobs = false; + bool RequestedReplicationToken = false; + bool HoldingReplicationToken = false; TWatchdogTimer ReplProgressWatchdog; @@ -287,6 +289,12 @@ namespace NKikimr { case Plan: // this is a first quantum of replication, so we have to register it in the broker State = AwaitToken; + Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken); + if (RequestedReplicationToken) { + STLOG(PRI_CRIT, BS_REPL, BSVR38, ReplCtx->VCtx->VDiskLogPrefix << "excessive replication token requested"); + break; + } + RequestedReplicationToken = true; if (!Send(MakeBlobStorageReplBrokerID(), new TEvQueryReplToken(ReplCtx->VDiskCfg->BaseInfo.PDiskId))) { HandleReplToken(); } @@ -303,6 +311,10 @@ namespace NKikimr { } void HandleReplToken() { + Y_ABORT_UNLESS(RequestedReplicationToken); + RequestedReplicationToken = false; + HoldingReplicationToken = true; + // switch to replication state Transition(AwaitToken, Replication); if (!ResumeIfReady()) { @@ -408,6 +420,9 @@ namespace NKikimr { if (State == WaitQueues || State == Replication) { // release token as we have finished replicating Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken); + Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken); + Y_DEBUG_ABORT_UNLESS(HoldingReplicationToken); + HoldingReplicationToken = false; } ResetReplProgressTimer(true); @@ -635,7 +650,15 @@ namespace NKikimr { // return replication token if we have one if (State == AwaitToken || State == WaitQueues || State == Replication) { - Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken); + Y_DEBUG_ABORT_UNLESS(RequestedReplicationToken || HoldingReplicationToken); + if (RequestedReplicationToken || HoldingReplicationToken) { + Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken); + } + } else { + Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken && !HoldingReplicationToken); + if (RequestedReplicationToken || HoldingReplicationToken) { + STLOG(PRI_CRIT, BS_REPL, BSVR37, ReplCtx->VCtx->VDiskLogPrefix << "stuck replication token"); + } } if (ReplJobActorId) { diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp index b37ce712e0c5..0a8569e1241f 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp @@ -129,6 +129,7 @@ namespace NKikimr { ui64 NextReceiveCookie; TResultQueue ResultQueue; std::shared_ptr Tracker = std::make_shared(); + bool Terminated = false; TQueue> SchedulerRequestQ; THashMap RequestTokens; @@ -227,9 +228,7 @@ namespace NKikimr { PrefetchDataSize = 0; RequestFromVDiskProxyPending = false; if (Finished) { - Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue); - RequestTokens.clear(); - return PassAway(); // TODO(alexvru): check correctness of invocations + return PassAway(); } } // send request(s) if prefetch queue is not full @@ -297,6 +296,9 @@ namespace NKikimr { if (msg->Record.GetCookie() == NextReceiveCookie) { ui64 cookie = NextReceiveCookie; ProcessResult(msg); + if (Terminated) { + return; + } ReleaseMemToken(cookie); while (!ResultQueue.empty()) { const TQueueItem& top = ResultQueue.top(); @@ -305,6 +307,9 @@ namespace NKikimr { } ui64 cookie = NextReceiveCookie; ProcessResult(top.get()); + if (Terminated) { + return; + } ReleaseMemToken(cookie); ResultQueue.pop(); } @@ -314,6 +319,7 @@ namespace NKikimr { } void ReleaseMemToken(ui64 cookie) { + Y_ABORT_UNLESS(!Terminated); if (RequestTokens) { auto it = RequestTokens.find(cookie); Y_ABORT_UNLESS(it != RequestTokens.end()); @@ -428,6 +434,13 @@ namespace NKikimr { } } + void PassAway() override { + Y_ABORT_UNLESS(!Terminated); + Terminated = true; + Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue); + TActorBootstrapped::PassAway(); + } + STRICT_STFUNC(StateFunc, hFunc(TEvReplProxyNext, Handle) hFunc(TEvReplMemToken, Handle) @@ -446,8 +459,7 @@ namespace NKikimr { TTrackableVector&& ids, const TVDiskID& vdiskId, const TActorId& serviceId) - : TActorBootstrapped() - , ReplCtx(std::move(replCtx)) + : ReplCtx(std::move(replCtx)) , GType(ReplCtx->VCtx->Top->GType) , Ids(std::move(ids)) , VDiskId(vdiskId) diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp index 519995c69179..0e1b96aa1511 100644 --- a/ydb/core/kqp/common/kqp_tx.cpp +++ b/ydb/core/kqp/common/kqp_tx.cpp @@ -136,7 +136,8 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig { Y_UNUSED(config); - if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) + if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE && + *txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO) return false; if (txCtx.GetSnapshot().IsValid()) diff --git a/ydb/core/kqp/common/kqp_tx.h b/ydb/core/kqp/common/kqp_tx.h index a668c0ea4977..af3ff88c35dc 100644 --- a/ydb/core/kqp/common/kqp_tx.h +++ b/ydb/core/kqp/common/kqp_tx.h @@ -212,8 +212,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { break; case Ydb::Table::TransactionSettings::kSnapshotReadOnly: - // TODO: (KIKIMR-3374) Use separate isolation mode to avoid optimistic locks. - EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE; + EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO; Readonly = true; break; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 896507804bab..950e6d4d5c1b 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1039,8 +1039,7 @@ class TKqpExecuterBase : public TActorBootstrapped { std::sort(std::begin(shardsRanges), std::end(shardsRanges), [&](const TShardRangesWithShardId& lhs, const TShardRangesWithShardId& rhs) { // Special case for infinity if (lhs.Ranges->GetRightBorder().first->GetCells().empty() || rhs.Ranges->GetRightBorder().first->GetCells().empty()) { - YQL_ENSURE(!lhs.Ranges->GetRightBorder().first->GetCells().empty() || !rhs.Ranges->GetRightBorder().first->GetCells().empty()); - return rhs.Ranges->GetRightBorder().first->GetCells().empty(); + return !lhs.Ranges->GetRightBorder().first->GetCells().empty(); } return CompareTypedCellVectors( lhs.Ranges->GetRightBorder().first->GetCells().data(), diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 0df145f00af0..7ad66678f3cd 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -187,8 +187,8 @@ std::unique_ptr TKqpPlanner::SerializeReque request.SetTxId(TxId); if (LockTxId) { request.SetLockTxId(*LockTxId); + request.SetLockNodeId(LockNodeId); } - request.SetLockNodeId(LockNodeId); ActorIdToProto(ExecuterId, request.MutableExecuterActorId()); if (Deadline) { diff --git a/ydb/core/kqp/gateway/behaviour/view/manager.cpp b/ydb/core/kqp/gateway/behaviour/view/manager.cpp index 069697d53baf..f7d421a790ae 100644 --- a/ydb/core/kqp/gateway/behaviour/view/manager.cpp +++ b/ydb/core/kqp/gateway/behaviour/view/manager.cpp @@ -60,6 +60,7 @@ void FillCreateViewProposal(NKikimrSchemeOp::TModifyScheme& modifyScheme, const auto pathPair = SplitPathByDb(settings.GetObjectId(), context.GetDatabase()); modifyScheme.SetWorkingDir(pathPair.first); modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateView); + modifyScheme.SetFailedOnAlreadyExists(!settings.GetExistingOk()); auto& viewDesc = *modifyScheme.MutableCreateView(); viewDesc.SetName(pathPair.second); @@ -77,6 +78,7 @@ void FillDropViewProposal(NKikimrSchemeOp::TModifyScheme& modifyScheme, const auto pathPair = SplitPathByObjectId(settings.GetObjectId()); modifyScheme.SetWorkingDir(pathPair.first); modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpDropView); + modifyScheme.SetSuccessOnNotExist(settings.GetMissingOk()); auto& drop = *modifyScheme.MutableDrop(); drop.SetName(pathPair.second); @@ -84,9 +86,12 @@ void FillDropViewProposal(NKikimrSchemeOp::TModifyScheme& modifyScheme, NThreading::TFuture SendSchemeRequest(TEvTxUserProxy::TEvProposeTransaction* request, TActorSystem* actorSystem, - bool failOnAlreadyExists) { + bool failedOnAlreadyExists, + bool successOnNotExist) { const auto promiseScheme = NThreading::NewPromise(); - IActor* const requestHandler = new TSchemeOpRequestHandler(request, promiseScheme, failOnAlreadyExists); + IActor* const requestHandler = new TSchemeOpRequestHandler( + request, promiseScheme, failedOnAlreadyExists, successOnNotExist + ); actorSystem->Register(requestHandler); return promiseScheme.GetFuture().Apply([](const NThreading::TFuture& opResult) { if (opResult.HasValue()) { @@ -109,7 +114,12 @@ NThreading::TFuture CreateView(const NYql::TCreateObjectSe auto& schemeTx = *proposal->Record.MutableTransaction()->MutableModifyScheme(); FillCreateViewProposal(schemeTx, settings, context.GetExternalData()); - return SendSchemeRequest(proposal.Release(), context.GetExternalData().GetActorSystem(), true); + return SendSchemeRequest( + proposal.Release(), + context.GetExternalData().GetActorSystem(), + schemeTx.GetFailedOnAlreadyExists(), + schemeTx.GetSuccessOnNotExist() + ); } NThreading::TFuture DropView(const NYql::TDropObjectSettings& settings, @@ -122,7 +132,12 @@ NThreading::TFuture DropView(const NYql::TDropObjectSettin auto& schemeTx = *proposal->Record.MutableTransaction()->MutableModifyScheme(); FillDropViewProposal(schemeTx, settings); - return SendSchemeRequest(proposal.Release(), context.GetExternalData().GetActorSystem(), false); + return SendSchemeRequest( + proposal.Release(), + context.GetExternalData().GetActorSystem(), + schemeTx.GetFailedOnAlreadyExists(), + schemeTx.GetSuccessOnNotExist() + ); } void PrepareCreateView(NKqpProto::TKqpSchemeOperation& schemeOperation, @@ -214,10 +229,10 @@ NThreading::TFuture TViewManager::ExecutePrepared(const NK switch (schemeOperation.GetOperationCase()) { case NKqpProto::TKqpSchemeOperation::kCreateView: schemeTx.CopyFrom(schemeOperation.GetCreateView()); - return SendSchemeRequest(proposal.Release(), context.GetActorSystem(), true); + break; case NKqpProto::TKqpSchemeOperation::kDropView: schemeTx.CopyFrom(schemeOperation.GetDropView()); - return SendSchemeRequest(proposal.Release(), context.GetActorSystem(), false); + break; default: return NThreading::MakeFuture(TYqlConclusionStatus::Fail( TStringBuilder() @@ -226,6 +241,12 @@ NThreading::TFuture TViewManager::ExecutePrepared(const NK ) ); } + return SendSchemeRequest( + proposal.Release(), + context.GetActorSystem(), + schemeTx.GetFailedOnAlreadyExists(), + schemeTx.GetSuccessOnNotExist() + ); } } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp index 29722d23a386..99fd7c51cbe3 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp @@ -193,6 +193,18 @@ TMaybeNode ExtractTopSortKeySelector(TExprBase node, const NYql::TPar return {}; } +bool IsIdLambda(TExprBase body) { + if (auto cond = body.Maybe()) { + if (auto boolLit = cond.Cast().Predicate().Maybe()) { + return boolLit.Literal().Cast().Value() == "true" && cond.Value().Maybe(); + } + } + if (body.Maybe()) { + return true; + } + return false; +} + } // namespace TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, @@ -305,7 +317,7 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx const NYql::TKikimrTableDescription & tableDesc) -> TIndexComparisonKey { return std::make_tuple( - keySelector.IsValid() && IsSortKeyPrimary(keySelector.Cast(), tableDesc), + keySelector.IsValid() && IsSortKeyPrimary(keySelector.Cast(), tableDesc) && IsIdLambda(TCoLambda(buildResult.PrunedLambda).Body()), buildResult.PointPrefixLen >= descriptionKeyColumns, buildResult.PointPrefixLen >= descriptionKeyColumns ? 0 : buildResult.PointPrefixLen, buildResult.UsedPrefixLen >= descriptionKeyColumns, diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index cd478e6cdac0..718a45473d00 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -4062,24 +4062,111 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { Y_UNIT_TEST(AutoChooseIndexOrderByLimit) { TKikimrSettings settings; NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetIndexAutoChooseMode(NKikimrConfig::TTableServiceConfig_EIndexAutoChooseMode_ONLY_POINTS); + appConfig.MutableTableServiceConfig()->SetIndexAutoChooseMode(NKikimrConfig::TTableServiceConfig_EIndexAutoChooseMode_MAX_USED_PREFIX); settings.SetAppConfig(appConfig); TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - CreateSampleTablesWithIndex(session); + { + auto session = db.CreateSession().GetValueSync().GetSession(); + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + CREATE TABLE `/Root/ComplexKey` ( + Key1 Int32, + Key2 Int32, + Key3 Int32, + Value Int32, + PRIMARY KEY (Key1, Key2, Key3), + INDEX Index GLOBAL ON (Key2) + ); + )").GetValueSync()); + + auto result2 = session.ExecuteDataQuery(R"( + REPLACE INTO `/Root/ComplexKey` (Key1, Key2, Key3, Value) VALUES + (1, 1, 101, 1), + (2, 2, 102, 1), + (2, 2, 103, 3), + (3, 3, 103, 2); + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result2.IsSuccess(), result2.GetIssues().ToString()); + } + + NYdb::NTable::TExecDataQuerySettings querySettings; + querySettings.CollectQueryStats(ECollectQueryStatsMode::Profile); + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + SELECT Key1, Key2, Key3 FROM `/Root/ComplexKey` + WHERE Key1 = 2 and Key2 = 2; + )", TTxControl::BeginTx(TTxSettings::SerializableRW()), querySettings).GetValueSync(); + AssertSuccessResult(result); + AssertTableReads(result, "/Root/ComplexKey/Index/indexImplTable", 2); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + SELECT Key1, Key2, Key3 FROM `/Root/ComplexKey` + WHERE Key1 = 2 and Key2 = 2 + ORDER BY Key1 DESC + LIMIT 1; + )", TTxControl::BeginTx(TTxSettings::SerializableRW()), querySettings).GetValueSync(); + AssertSuccessResult(result); + AssertTableReads(result, "/Root/ComplexKey/Index/indexImplTable", 0); + } + } + + Y_UNIT_TEST(AutoChooseIndexOrderByLambda) { + TKikimrSettings settings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetIndexAutoChooseMode(NKikimrConfig::TTableServiceConfig_EIndexAutoChooseMode_MAX_USED_PREFIX); + settings.SetAppConfig(appConfig); + + TKikimrRunner kikimr(settings); + + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + { + auto session = db.CreateSession().GetValueSync().GetSession(); + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + CREATE TABLE `/Root/ComplexKey` ( + Key Int32, + Fk Int32, + Value String, + PRIMARY KEY (Key, Fk), + INDEX Index GLOBAL ON (Value) + ); + )").GetValueSync()); + + auto result2 = session.ExecuteDataQuery(R"( + REPLACE INTO `/Root/ComplexKey` (Key, Fk, Value) VALUES + (null, null, "NullValue"), + (1, 101, "Value1"), + (2, 102, "Value1"), + (2, 103, "Value3"), + (3, 103, "Value2"), + (4, 104, "Value2"), + (5, 105, "Value3"); + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result2.IsSuccess(), result2.GetIssues().ToString()); + } NYdb::NTable::TExecDataQuerySettings querySettings; querySettings.CollectQueryStats(ECollectQueryStatsMode::Profile); auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - SELECT Fk, Key FROM `/Root/SecondaryKeys` WHERE Fk = 1 ORDER BY Key DESC LIMIT 1; + SELECT Key, Fk, Value FROM `/Root/ComplexKey` + WHERE Key = 2 + ORDER BY Value DESC + LIMIT 1; )", TTxControl::BeginTx(TTxSettings::SerializableRW()), querySettings).GetValueSync(); AssertSuccessResult(result); - AssertTableReads(result, "/Root/SecondaryKeys/Index/indexImplTable", 0); + AssertTableReads(result, "/Root/ComplexKey", 2); } Y_UNIT_TEST(MultipleBroadcastJoin) { diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 17a1046439a1..bf366f11b928 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -4147,6 +4147,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { CheckDirEntry(kikimr, entriesToCheck); } } + Y_UNIT_TEST(CreateOrDropTopicOverTable) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); @@ -4218,6 +4219,65 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } } + Y_UNIT_TEST(AlterCdcTopic) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr{serverSettings}; + auto tableClient = kikimr.GetTableClient(); + + { + auto tcSession = tableClient.CreateSession().GetValueSync().GetSession(); + UNIT_ASSERT(tcSession.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/TmpTable` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )").GetValueSync().IsSuccess()); + + UNIT_ASSERT(tcSession.ExecuteSchemeQuery(R"( + ALTER TABLE `/Root/TmpTable` ADD CHANGEFEED `feed` WITH ( + MODE = 'KEYS_ONLY', FORMAT = 'JSON' + ); + )").GetValueSync().IsSuccess()); + tcSession.Close(); + } + + auto pq = NYdb::NTopic::TTopicClient(kikimr.GetDriver(), + NYdb::NTopic::TTopicClientSettings().Database("/Root").AuthToken("root@builtin")); + + auto client = kikimr.GetQueryClient(NYdb::NQuery::TClientSettings{}.AuthToken("root@builtin")); + auto session = client.GetSession().GetValueSync().GetSession(); + { + + const auto query = Q_(R"( + --!syntax_v1 + ALTER TOPIC `/Root/TmpTable/feed` ADD CONSUMER consumer21; + )"); + + RunQuery(query, session); + auto desc = pq.DescribeTopic("/Root/TmpTable/feed").ExtractValueSync(); + const auto& consumers = desc.GetTopicDescription().GetConsumers(); + UNIT_ASSERT_VALUES_EQUAL(consumers.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(consumers[0].GetConsumerName(), "consumer21"); + + } + { + const auto query = Q_(R"( + --!syntax_v1 + ALTER TOPIC `/Root/TmpTable/feed` SET (min_active_partitions = 10); + )"); + RunQuery(query, session, false); + auto desc = pq.DescribeTopic("/Root/TmpTable/feed").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 1); + } + + } + Y_UNIT_TEST(TableSink_OlapRWQueries) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); diff --git a/ydb/core/kqp/ut/view/view_ut.cpp b/ydb/core/kqp/ut/view/view_ut.cpp index ba7aca97e589..cb55869e4bf2 100644 --- a/ydb/core/kqp/ut/view/view_ut.cpp +++ b/ydb/core/kqp/ut/view/view_ut.cpp @@ -344,10 +344,60 @@ Y_UNIT_TEST_SUITE(TCreateAndDropViewTest) { { const auto creationResult = session.ExecuteSchemeQuery(creationQuery).GetValueSync(); UNIT_ASSERT(!creationResult.IsSuccess()); - UNIT_ASSERT(creationResult.GetIssues().ToString().Contains("error: path exist, request accepts it")); + UNIT_ASSERT_STRING_CONTAINS(creationResult.GetIssues().ToString(), "error: path exist, request accepts it"); } } + Y_UNIT_TEST(CreateViewOccupiedName) { + TKikimrRunner kikimr(TKikimrSettings().SetWithSampleTables(false)); + EnableViewsFeatureFlag(kikimr); + auto session = kikimr.GetQueryClient().GetSession().ExtractValueSync().GetSession(); + + constexpr const char* path = "table"; + + const TString createTable = std::format(R"( + CREATE TABLE {} (key Int32, value Utf8, PRIMARY KEY (key)); + )", path + ); + ExecuteQuery(session, createTable); + + auto checkError = [&session](const TString& query, const TString& expectedError) { + const auto result = session.ExecuteQuery(query, NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT(!result.IsSuccess()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), expectedError); + }; + + const TString queryTemplate = std::format(R"( + CREATE VIEW {{}}{} WITH (security_invoker = true) AS SELECT 1; + )", path + ); + const TString expectedError = std::format("path: '/Root/{}', error: unexpected path type", path); + + for (std::string existenceCheck : {"", "IF NOT EXISTS "}) { + const TString createView = std::vformat(queryTemplate, std::make_format_args(existenceCheck)); + checkError(createView, expectedError); + } + } + + Y_UNIT_TEST(CreateViewIfNotExists) { + TKikimrRunner kikimr(TKikimrSettings().SetWithSampleTables(false)); + EnableViewsFeatureFlag(kikimr); + auto session = kikimr.GetQueryClient().GetSession().ExtractValueSync().GetSession(); + + constexpr const char* path = "/Root/TheView"; + constexpr const char* queryInView = "SELECT 1"; + + const TString creationQuery = std::format(R"( + CREATE VIEW IF NOT EXISTS `{}` WITH (security_invoker = true) AS {}; + )", + path, + queryInView + ); + ExecuteQuery(session, creationQuery); + // an attempt to create a duplicate does not produce an error + ExecuteQuery(session, creationQuery); + } + Y_UNIT_TEST(DropView) { TKikimrRunner kikimr(TKikimrSettings().SetWithSampleTables(false)); EnableViewsFeatureFlag(kikimr); @@ -399,6 +449,42 @@ Y_UNIT_TEST_SUITE(TCreateAndDropViewTest) { UNIT_ASSERT_STRING_CONTAINS(dropResult.GetIssues().ToString(), "Error: Views are disabled"); } + Y_UNIT_TEST(DropNonexistingView) { + TKikimrRunner kikimr(TKikimrSettings().SetWithSampleTables(false)); + EnableViewsFeatureFlag(kikimr); + auto session = kikimr.GetQueryClient().GetSession().ExtractValueSync().GetSession(); + + const auto dropResult = session.ExecuteQuery( + "DROP VIEW NonexistingView;", NQuery::TTxControl::NoTx() + ).ExtractValueSync(); + + UNIT_ASSERT(!dropResult.IsSuccess()); + UNIT_ASSERT_STRING_CONTAINS(dropResult.GetIssues().ToString(), "Error: Path does not exist"); + } + + Y_UNIT_TEST(CallDropViewOnTable) { + TKikimrRunner kikimr(TKikimrSettings().SetWithSampleTables(false)); + EnableViewsFeatureFlag(kikimr); + auto session = kikimr.GetQueryClient().GetSession().ExtractValueSync().GetSession(); + + constexpr const char* path = "table"; + + const TString createTable = std::format(R"( + CREATE TABLE {} (key Int32, value Utf8, PRIMARY KEY (key)); + )", path + ); + ExecuteQuery(session, createTable); + + auto checkError = [&session](const TString& query, const TString& expectedError) { + const auto result = session.ExecuteQuery(query, NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT(!result.IsSuccess()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), expectedError); + }; + const TString expectedError = std::format("path: '/Root/{}', error: path is not a view", path); + checkError(std::format("DROP VIEW {};", path), expectedError); + checkError(std::format("DROP VIEW IF EXISTS {};", path), expectedError); + } + Y_UNIT_TEST(DropSameViewTwice) { TKikimrRunner kikimr(TKikimrSettings().SetWithSampleTables(false)); EnableViewsFeatureFlag(kikimr); @@ -424,10 +510,36 @@ Y_UNIT_TEST_SUITE(TCreateAndDropViewTest) { { const auto dropResult = session.ExecuteSchemeQuery(dropQuery).GetValueSync(); UNIT_ASSERT(!dropResult.IsSuccess()); - UNIT_ASSERT(dropResult.GetIssues().ToString().Contains("Error: Path does not exist")); + UNIT_ASSERT_STRING_CONTAINS(dropResult.GetIssues().ToString(), "Error: Path does not exist"); } } + Y_UNIT_TEST(DropViewIfExists) { + TKikimrRunner kikimr(TKikimrSettings().SetWithSampleTables(false)); + EnableViewsFeatureFlag(kikimr); + auto session = kikimr.GetQueryClient().GetSession().ExtractValueSync().GetSession(); + + constexpr const char* path = "/Root/TheView"; + constexpr const char* queryInView = "SELECT 1"; + + const TString creationQuery = std::format(R"( + CREATE VIEW `{}` WITH (security_invoker = true) AS {}; + )", + path, + queryInView + ); + ExecuteQuery(session, creationQuery); + + const TString dropQuery = std::format(R"( + DROP VIEW IF EXISTS `{}`; + )", + path + ); + ExecuteQuery(session, dropQuery); + // an attempt to drop an already deleted view does not produce an error + ExecuteQuery(session, dropQuery); + } + Y_UNIT_TEST(DropViewInFolder) { TKikimrRunner kikimr(TKikimrSettings().SetWithSampleTables(false)); EnableViewsFeatureFlag(kikimr); diff --git a/ydb/core/mind/hive/hive_impl.cpp b/ydb/core/mind/hive/hive_impl.cpp index 9c250e144647..55dada0352af 100644 --- a/ydb/core/mind/hive/hive_impl.cpp +++ b/ydb/core/mind/hive/hive_impl.cpp @@ -96,7 +96,7 @@ void THive::RestartPipeTx(ui64 tabletId) { } bool THive::TryToDeleteNode(TNodeInfo* node) { - if (node->CanBeDeleted()) { + if (node->CanBeDeleted(TActivationContext::Now())) { BLOG_I("TryToDeleteNode(" << node->Id << "): deleting"); DeleteNode(node->Id); return true; @@ -120,12 +120,15 @@ void THive::Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev) { void THive::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev) { if (ev->Get()->TabletId == TabletID()) { BLOG_TRACE("Handle TEvTabletPipe::TEvServerDisconnected(" << ev->Get()->ClientId << ") " << ev->Get()->ServerId); - TNodeInfo* node = FindNode(ev->Get()->ClientId.NodeId()); + auto nodeId = ev->Get()->ClientId.NodeId(); + TNodeInfo* node = FindNode(nodeId); if (node != nullptr) { Erase(node->PipeServers, ev->Get()->ServerId); if (node->PipeServers.empty() && node->IsUnknown()) { ObjectDistributions.RemoveNode(*node); - TryToDeleteNode(node); + if (TryToDeleteNode(node)) { + Execute(CreateDeleteNode(nodeId)); + } } } } @@ -1705,6 +1708,14 @@ void THive::UpdateCounterPingQueueSize() { } } +void THive::UpdateCounterTabletsStarting(i64 tabletsStartingDiff) { + if (TabletCounters != nullptr) { + auto& counter = TabletCounters->Simple()[NHive::COUNTER_TABLETS_STARTING]; + auto newValue = counter.Get() + tabletsStartingDiff; + counter.Set(newValue); + } +} + void THive::RecordTabletMove(const TTabletMoveInfo& moveInfo) { TabletMoveHistory.PushBack(moveInfo); TabletCounters->Cumulative()[NHive::COUNTER_TABLETS_MOVED].Increment(1); @@ -3384,13 +3395,16 @@ void THive::Handle(TEvPrivate::TEvLogTabletMoves::TPtr&) { } void THive::Handle(TEvPrivate::TEvDeleteNode::TPtr& ev) { - auto node = FindNode(ev->Get()->NodeId); + auto nodeId = ev->Get()->NodeId; + auto node = FindNode(nodeId); if (node == nullptr) { return; } node->DeletionScheduled = false; if (!node->IsAlive()) { - TryToDeleteNode(node); + if (TryToDeleteNode(node)) { + Execute(CreateDeleteNode(nodeId)); + } } } diff --git a/ydb/core/mind/hive/hive_impl.h b/ydb/core/mind/hive/hive_impl.h index 24edd21f968d..4cadc764ac56 100644 --- a/ydb/core/mind/hive/hive_impl.h +++ b/ydb/core/mind/hive/hive_impl.h @@ -301,6 +301,7 @@ class THive : public TActor, public TTabletExecutedFlat, public THiveShar ITransaction* CreateRequestTabletOwners(TEvHive::TEvRequestTabletOwners::TPtr event); ITransaction* CreateUpdateTabletsObject(TEvHive::TEvUpdateTabletsObject::TPtr event); ITransaction* CreateUpdateDomain(TSubDomainKey subdomainKey, TEvHive::TEvUpdateDomain::TPtr event = {}); + ITransaction* CreateDeleteNode(TNodeId nodeId); public: TDomainsView DomainsView; @@ -653,6 +654,7 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId void UpdateCounterEventQueueSize(i64 eventQueueSizeDiff); void UpdateCounterNodesConnected(i64 nodesConnectedDiff); void UpdateCounterPingQueueSize(); + void UpdateCounterTabletsStarting(i64 tabletsStartingDiff); void RecordTabletMove(const TTabletMoveInfo& info); bool DomainHasNodes(const TSubDomainKey &domainKey) const; void ProcessBootQueue(); diff --git a/ydb/core/mind/hive/hive_ut.cpp b/ydb/core/mind/hive/hive_ut.cpp index 18417ce97eeb..4bfa4031dee3 100644 --- a/ydb/core/mind/hive/hive_ut.cpp +++ b/ydb/core/mind/hive/hive_ut.cpp @@ -1069,6 +1069,119 @@ Y_UNIT_TEST_SUITE(THiveTest) { UNIT_ASSERT(!isNodeEmpty(nodeId)); } + Y_UNIT_TEST(DrainWithHiveRestart) { + // 1. Drain a node + // 2. Kill it & wait for hive to delete it + // 3. Start the node again + // 4. Restart hive + // 5. Ensure node is not down (by creating tablets) + const int NUM_NODES = 3; + const int NUM_TABLETS = 10; + TTestBasicRuntime runtime(NUM_NODES, false); + Setup(runtime, true, 2, [](TAppPrepare& app) { + app.HiveConfig.SetNodeDeletePeriod(1); + }); + const ui64 hiveTablet = MakeDefaultHiveID(); + const ui64 testerTablet = MakeTabletID(false, 1); + const TActorId hiveActor = CreateTestBootstrapper(runtime, CreateTestTabletInfo(hiveTablet, TTabletTypes::Hive), &CreateDefaultHive); + runtime.EnableScheduleForActor(hiveActor); + { + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvLocal::EvStatus, NUM_NODES); + runtime.DispatchEvents(options); + } + TTabletTypes::EType tabletType = TTabletTypes::Dummy; + std::unordered_set tablets; + TActorId senderA = runtime.AllocateEdgeActor(0); + auto createTablets = [&] { + for (int i = 0; i < NUM_TABLETS; ++i) { + THolder ev(new TEvHive::TEvCreateTablet(testerTablet, 100500 + tablets.size(), tabletType, BINDED_CHANNELS)); + runtime.SendToPipe(hiveTablet, senderA, ev.Release(), 0, GetPipeConfigWithRetries()); + TAutoPtr handle; + auto createTabletReply = runtime.GrabEdgeEventRethrow(handle); + ui64 tabletId = createTabletReply->Record.GetTabletID(); + tablets.insert(tabletId); + } + NTabletPipe::TClientConfig pipeConfig; + pipeConfig.RetryPolicy = NTabletPipe::TClientRetryPolicy::WithRetries(); + for (TTabletId tabletId : tablets) { + MakeSureTabletIsUp(runtime, tabletId, 0, &pipeConfig); + } + }; + + createTablets(); + + ui32 nodeIdx = 0; + ui32 nodeId = runtime.GetNodeId(nodeIdx); + { + Ctest << "1. Drain a node\n"; + + runtime.SendToPipe(hiveTablet, senderA, new TEvHive::TEvDrainNode(nodeId)); + + Ctest << "2. Kill it & wait for hive to delete it\n"; + + SendKillLocal(runtime, nodeIdx); + { + TDispatchOptions options; + options.FinalEvents.emplace_back(NHive::TEvPrivate::EvDeleteNode); + runtime.DispatchEvents(options); + runtime.AdvanceCurrentTime(TDuration::Seconds(2)); + runtime.DispatchEvents(options); + } + } + + auto isNodeEmpty = [&](ui32 nodeId) -> bool { + bool empty = true; + TAutoPtr handle; + TActorId whiteboard = NNodeWhiteboard::MakeNodeWhiteboardServiceId(nodeId); + runtime.Send(new IEventHandle(whiteboard, senderA, new NNodeWhiteboard::TEvWhiteboard::TEvTabletStateRequest())); + NNodeWhiteboard::TEvWhiteboard::TEvTabletStateResponse* wbResponse = runtime.GrabEdgeEventRethrow(handle); + for (const NKikimrWhiteboard::TTabletStateInfo& tabletInfo : wbResponse->Record.GetTabletStateInfo()) { + if (tablets.contains(tabletInfo.GetTabletId()) && tabletInfo.GetState() != NKikimrWhiteboard::TTabletStateInfo::Dead) { + Ctest << "Tablet " << tabletInfo.GetTabletId() << "." << tabletInfo.GetFollowerId() + << " is not dead yet (" << NKikimrWhiteboard::TTabletStateInfo::ETabletState_Name(tabletInfo.GetState()) << ")" << Endl; + empty = false; + } + } + return empty; + }; + + Ctest << "3. Start the node again\n"; + CreateLocal(runtime, nodeIdx); + + { + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvLocal::EvStatus); + runtime.DispatchEvents(options); + } + + Ctest << "4. Restart hive\n"; + + runtime.Register(CreateTabletKiller(hiveTablet)); + { + TDispatchOptions options; + std::unordered_set nodesConnected; + auto observer = runtime.AddObserver([&](auto&& ev) { nodesConnected.insert(ev->Sender.NodeId()); }); + auto waitFor = [&](const auto& condition, const TString& description) { + while (!condition()) { + Ctest << "waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + runtime.DispatchEvents(options); + } + }; + waitFor([&](){return nodesConnected.size() == NUM_NODES; }, "nodes to connect"); + } + + Ctest << "5. Ensure node is not down (by creating tablets)\n"; + + createTablets(); + + UNIT_ASSERT(!isNodeEmpty(nodeId)); + } + Y_UNIT_TEST(TestCreateSubHiveCreateTablet) { TTestBasicRuntime runtime(1, false); Setup(runtime, true); diff --git a/ydb/core/mind/hive/monitoring.cpp b/ydb/core/mind/hive/monitoring.cpp index 2a74db07926b..f442726a910f 100644 --- a/ydb/core/mind/hive/monitoring.cpp +++ b/ydb/core/mind/hive/monitoring.cpp @@ -246,7 +246,7 @@ class TTxMonEvent_MemStateTablets : public TTransactionBase { if (WaitingOnly) { tabletIdIndex.reserve(Self->BootQueue.WaitQueue.size()); for (const TBootQueue::TBootQueueRecord& rec : Self->BootQueue.WaitQueue) { - TTabletInfo* tablet = Self->FindTablet(rec.TabletId); + TTabletInfo* tablet = Self->FindTablet(rec.TabletId, rec.FollowerId); if (tablet != nullptr) { tabletIdIndex.push_back({tabletIndexFunction(*tablet), tablet}); } diff --git a/ydb/core/mind/hive/node_info.cpp b/ydb/core/mind/hive/node_info.cpp index 6acc1853abae..341e309b94ef 100644 --- a/ydb/core/mind/hive/node_info.cpp +++ b/ydb/core/mind/hive/node_info.cpp @@ -467,7 +467,7 @@ TResourceRawValues TNodeInfo::GetStDevResourceValues() { return GetStDev(values); } -bool TNodeInfo::CanBeDeleted() const { +bool TNodeInfo::CanBeDeleted(TInstant now) const { TInstant lastAlive(TInstant::MilliSeconds(Statistics.GetLastAliveTimestamp())); if (lastAlive) { return (IsDisconnected() || IsUnknown()) @@ -475,7 +475,7 @@ bool TNodeInfo::CanBeDeleted() const { && GetTabletsTotal() == 0 && LockedTablets.empty() && !Freeze - && (lastAlive + Hive.GetNodeDeletePeriod() < TInstant::Now()); + && (lastAlive + Hive.GetNodeDeletePeriod() < now); } else { return (IsDisconnected() || IsUnknown()) && !Local && GetTabletsTotal() == 0 && LockedTablets.empty() && !Freeze; } diff --git a/ydb/core/mind/hive/node_info.h b/ydb/core/mind/hive/node_info.h index afc23ad92318..957ff626abf6 100644 --- a/ydb/core/mind/hive/node_info.h +++ b/ydb/core/mind/hive/node_info.h @@ -231,7 +231,7 @@ struct TNodeInfo { } } - bool CanBeDeleted() const; + bool CanBeDeleted(TInstant now) const; void RegisterInDomains(); void DeregisterInDomains(); void Ping(); diff --git a/ydb/core/mind/hive/tablet_info.h b/ydb/core/mind/hive/tablet_info.h index 433b5e988bd9..5d754ddd55d7 100644 --- a/ydb/core/mind/hive/tablet_info.h +++ b/ydb/core/mind/hive/tablet_info.h @@ -163,6 +163,7 @@ struct TTabletInfo { EBalancerPolicy BalancerPolicy; TNodeId FailedNodeId = 0; // last time we tried to start the tablet, we failed on this node bool InWaitQueue = false; + TInstant BootTime; TTabletInfo(ETabletRole role, THive& hive); TTabletInfo(const TTabletInfo&) = delete; diff --git a/ydb/core/mind/hive/tx__delete_node.cpp b/ydb/core/mind/hive/tx__delete_node.cpp new file mode 100644 index 000000000000..fa3b039826de --- /dev/null +++ b/ydb/core/mind/hive/tx__delete_node.cpp @@ -0,0 +1,38 @@ +#include "hive_impl.h" +#include "hive_log.h" + +namespace NKikimr { +namespace NHive { + +class TTxDeleteNode : public TTransactionBase { +protected: + TNodeId NodeId; +public: + TTxDeleteNode(TNodeId nodeId, THive *hive) + : TBase(hive) + , NodeId(nodeId) + {} + + bool Execute(TTransactionContext &txc, const TActorContext&) override { + NIceDb::TNiceDb db(txc.DB); + db.Table().Key(NodeId).Delete(); + auto restrictionsRowset = db.Table().Range(NodeId).Select(); + while (!restrictionsRowset.EndOfSet()) { + db.Table().Key(restrictionsRowset.GetKey()).Delete(); + if (!restrictionsRowset.Next()) { + return false; + } + } + return true; + } + + void Complete(const TActorContext&) override { + } + }; + + ITransaction* THive::CreateDeleteNode(TNodeId nodeId) { + return new TTxDeleteNode(nodeId, this); + } + +} // NHive +} // NKikimr diff --git a/ydb/core/mind/hive/tx__load_everything.cpp b/ydb/core/mind/hive/tx__load_everything.cpp index 710a14d2bc38..770dea99cad3 100644 --- a/ydb/core/mind/hive/tx__load_everything.cpp +++ b/ydb/core/mind/hive/tx__load_everything.cpp @@ -688,8 +688,9 @@ class TTxLoadEverything : public TTransactionBase { size_t numDeletedNodes = 0; size_t numDeletedRestrictions = 0; + TInstant now = TActivationContext::Now(); for (auto itNode = Self->Nodes.begin(); itNode != Self->Nodes.end();) { - if (itNode->second.CanBeDeleted()) { + if (itNode->second.CanBeDeleted(now)) { ++numDeletedNodes; auto restrictionsRowset = db.Table().Range(itNode->first).Select(); while (!restrictionsRowset.EndOfSet()) { diff --git a/ydb/core/mind/hive/tx__register_node.cpp b/ydb/core/mind/hive/tx__register_node.cpp index 7e8fb49b1f54..7c40397313e4 100644 --- a/ydb/core/mind/hive/tx__register_node.cpp +++ b/ydb/core/mind/hive/tx__register_node.cpp @@ -23,7 +23,7 @@ class TTxRegisterNode : public TTransactionBase { TNodeId nodeId = Local.NodeId(); TNodeInfo& node = Self->GetNode(nodeId); if (node.Local != Local) { - TInstant now = TInstant::Now(); + TInstant now = TActivationContext::Now(); node.Statistics.AddRestartTimestamp(now.MilliSeconds()); node.ActualizeNodeStatistics(now); for (const auto& t : node.Tablets) { @@ -57,6 +57,7 @@ class TTxRegisterNode : public TTransactionBase { db.Table().Key(nodeId).Update(false, false); } if (node.BecomeUpOnRestart) { + BLOG_TRACE("THive::TTxRegisterNode(" << Local.NodeId() << ")::Execute - node became up on restart"); node.SetDown(false); node.BecomeUpOnRestart = false; db.Table().Key(nodeId).Update(false, false); diff --git a/ydb/core/mind/hive/tx__start_tablet.cpp b/ydb/core/mind/hive/tx__start_tablet.cpp index 068f9915432b..034fd6a124bb 100644 --- a/ydb/core/mind/hive/tx__start_tablet.cpp +++ b/ydb/core/mind/hive/tx__start_tablet.cpp @@ -10,6 +10,7 @@ class TTxStartTablet : public TTransactionBase { ui64 Cookie; bool External; TSideEffects SideEffects; + bool Success; public: TTxStartTablet(TFullTabletId tabletId, const TActorId& local, ui64 cookie, bool external, THive *hive) @@ -23,10 +24,12 @@ class TTxStartTablet : public TTransactionBase { TTxType GetTxType() const override { return NHive::TXTYPE_START_TABLET; } bool Execute(TTransactionContext& txc, const TActorContext&) override { + Success = false; SideEffects.Reset(Self->SelfId()); BLOG_D("THive::TTxStartTablet::Execute Tablet " << TabletId); TTabletInfo* tablet = Self->FindTablet(TabletId); if (tablet != nullptr) { + tablet->BootTime = TActivationContext::Now(); // finish fast-move operation if (tablet->LastNodeId != 0 && tablet->LastNodeId != Local.NodeId()) { TNodeInfo* lastNode = Self->FindNode(tablet->LastNodeId); @@ -65,6 +68,7 @@ class TTxStartTablet : public TTransactionBase { new TEvLocal::TEvBootTablet(*leader.TabletStorageInfo, promotableFollowerId, leader.KnownGeneration), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, Cookie); + Success = true; return true; } else { BLOG_W("THive::TTxStartTablet::Execute, ignoring TEvBootTablet(" << leader.ToString() << ") - wrong state or node"); @@ -79,6 +83,7 @@ class TTxStartTablet : public TTransactionBase { new TEvLocal::TEvBootTablet(*follower.LeaderTablet.TabletStorageInfo, follower.Id), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, Cookie); + Success = true; return true; } else { BLOG_W("THive::TTxStartTablet::Execute, ignoring TEvBootTablet(" << follower.ToString() << ") - wrong state or node"); @@ -108,6 +113,9 @@ class TTxStartTablet : public TTransactionBase { void Complete(const TActorContext& ctx) override { BLOG_D("THive::TTxStartTablet::Complete Tablet " << TabletId << " SideEffects: " << SideEffects); SideEffects.Complete(ctx); + if (Success) { + Self->UpdateCounterTabletsStarting(+1); + } } }; diff --git a/ydb/core/mind/hive/tx__update_tablet_status.cpp b/ydb/core/mind/hive/tx__update_tablet_status.cpp index 621978d56892..c4efbbb9b24f 100644 --- a/ydb/core/mind/hive/tx__update_tablet_status.cpp +++ b/ydb/core/mind/hive/tx__update_tablet_status.cpp @@ -80,6 +80,14 @@ class TTxUpdateTabletStatus : public TTransactionBase { if (Status == TEvLocal::TEvTabletStatus::StatusOk) { tablet->Statistics.AddRestartTimestamp(now.MilliSeconds()); tablet->ActualizeTabletStatistics(now); + if (tablet->BootTime != TInstant()) { + TDuration startTime = now - tablet->BootTime; + if (startTime > TDuration::Seconds(30)) { + BLOG_W("Tablet " << tablet->GetFullTabletId() << " was starting for " << startTime.Seconds() << " seconds"); + } + Self->TabletCounters->Percentile()[NHive::COUNTER_TABLETS_START_TIME].IncrementFor(startTime.MilliSeconds()); + Self->UpdateCounterTabletsStarting(-1); + } TNodeInfo* node = Self->FindNode(Local.NodeId()); if (node == nullptr) { // event from IC about disconnection of the node could overtake events from the node itself because of Pipe Server diff --git a/ydb/core/mind/hive/ya.make b/ydb/core/mind/hive/ya.make index 5d4132950cd9..4666bb93cc34 100644 --- a/ydb/core/mind/hive/ya.make +++ b/ydb/core/mind/hive/ya.make @@ -47,6 +47,7 @@ SRCS( tx__configure_subdomain.cpp tx__create_tablet.cpp tx__cut_tablet_history.cpp + tx__delete_node.cpp tx__delete_tablet.cpp tx__delete_tablet_result.cpp tx__disconnect_node.cpp diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 5cc797504ed5..c7b40b2c8f84 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1226,18 +1226,25 @@ void TPartition::Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorCont void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx) { const ui64 cookie = ev->Get()->GetCookie(); - Y_ABORT_UNLESS(ReadInfo.contains(cookie)); - auto it = ReadInfo.find(cookie); - Y_ABORT_UNLESS(it != ReadInfo.end()); + + // If there is no such cookie, then read was canceled. + // For example, it can be after consumer deletion + if (it == ReadInfo.end()) { + return; + } TReadInfo info = std::move(it->second); ReadInfo.erase(it); - //make readinfo class - auto& userInfo = UsersInfoStorage->GetOrCreate(info.User, ctx); + auto* userInfo = UsersInfoStorage->GetIfExists(info.User); + if (!userInfo) { + ReplyError(ctx, info.Destination, NPersQueue::NErrorCode::BAD_REQUEST, GetConsumerDeletedMessage(info.User)); + OnReadRequestFinished(info.Destination, 0, info.User, ctx); + } + TReadAnswer answer(info.FormAnswer( - ctx, *ev->Get(), EndOffset, Partition, &userInfo, + ctx, *ev->Get(), EndOffset, Partition, userInfo, info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode() )); const auto& resp = dynamic_cast(answer.Event.Get())->Response; @@ -2422,6 +2429,20 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx) } UsersInfoStorage->Remove(user, ctx); + + // Finish all ongoing reads + std::unordered_set readCookies; + for (auto& [cookie, info] : ReadInfo) { + if (info.User == user) { + readCookies.insert(cookie); + ReplyError(ctx, info.Destination, NPersQueue::NErrorCode::BAD_REQUEST, GetConsumerDeletedMessage(user)); + OnReadRequestFinished(info.Destination, 0, user, ctx); + } + } + for (ui64 cookie : readCookies) { + ReadInfo.erase(cookie); + } + Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user)); } } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index c954012caa66..31315dbc7f0b 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -623,6 +623,7 @@ class TPartition : public TActorBootstrapped { static void RemoveMessages(TMessageQueue& src, TMessageQueue& dst); void RemovePendingRequests(TMessageQueue& requests); void RemoveMessagesToQueue(TMessageQueue& requests); + static TString GetConsumerDeletedMessage(TStringBuf consumerName); private: ui64 TabletID; diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index 387b51ae1938..4c7abf54e4f2 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -748,8 +748,8 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim auto* read = readEvent->Get(); const TString& user = read->ClientId; auto userInfo = UsersInfoStorage->GetIfExists(user); - if(!userInfo) { - ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, TStringBuilder() << "cannot finish read request. Consumer " << read->ClientId << " is gone from partition"); + if (!userInfo) { + ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, GetConsumerDeletedMessage(read->ClientId)); Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user)); OnReadRequestFinished(read->Cookie, 0, user, ctx); return; @@ -1026,4 +1026,8 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u ctx.Send(BlobCache, request.Release()); } +TString TPartition::GetConsumerDeletedMessage(TStringBuf consumerName) { + return TStringBuilder() << "cannot finish read request. Consumer " << consumerName << " is gone from partition"; +} + } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index abe6a4e7a0cb..aed439baa2fd 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -2326,6 +2326,100 @@ Y_UNIT_TEST(TestTabletRestoreEventsOrder) { }); } +Y_UNIT_TEST(TestReadAndDeleteConsumer) { + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function setup, bool& activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + activeZone = false; + tc.Runtime->SetScheduledLimit(2000); + tc.Runtime->SetScheduledEventFilter(&tc.ImmediateLogFlushAndRequestTimeoutFilter); + + TVector> data; + TString msg; + msg.resize(102400, 'a'); + for (ui64 i = 1; i <= 1000; ++i) { + data.emplace_back(i, msg); + } + + PQTabletPrepare({.maxCountInPartition=100, .deleteTime=TDuration::Days(2).Seconds(), .partitions=1}, + {{"user1", true}, {"user2", true}}, tc); + CmdWrite(0, "sourceid1", data, tc, false, {}, true); + + // Reset tablet cache + PQTabletRestart(tc); + + TAutoPtr handle; + TEvPersQueue::TEvResponse* readResult = nullptr; + THolder readRequest; + TEvPersQueue::TEvUpdateConfigResponse* consumerDeleteResult = nullptr; + THolder consumerDeleteRequest; + + // Read request + { + readRequest.Reset(new TEvPersQueue::TEvRequest); + auto req = readRequest->Record.MutablePartitionRequest(); + req->SetPartition(0); + auto read = req->MutableCmdRead(); + read->SetOffset(1); + read->SetClientId("user1"); + read->SetCount(1); + read->SetBytes(1'000'000); + read->SetTimeoutMs(5000); + } + + // Consumer delete request + { + consumerDeleteRequest.Reset(new TEvPersQueue::TEvUpdateConfig()); + consumerDeleteRequest->MutableRecord()->SetTxId(42); + auto& cfg = *consumerDeleteRequest->MutableRecord()->MutableTabletConfig(); + cfg.SetVersion(42); + cfg.AddPartitionIds(0); + cfg.AddPartitions()->SetPartitionId(0); + cfg.SetLocalDC(true); + cfg.SetTopic("topic"); + auto& cons = *cfg.AddConsumers(); + cons.SetName("user2"); + cons.SetImportant(true); + } + + TActorId edge = tc.Runtime->AllocateEdgeActor(); + + // Delete consumer during read request + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, readRequest.Release(), 0, GetPipeConfigWithRetries()); + + // Intercept TEvPQ::TEvBlobResponse event + std::vector capturedBlobResponses; + auto captureBlobResponsesObserver = tc.Runtime->AddObserver([&](TEvPQ::TEvBlobResponse::TPtr& ev) { + capturedBlobResponses.emplace_back().Swap(ev); + }); + + // Delete consumer while read request is still in progress + tc.Runtime->SendToPipe(tc.TabletId, edge, consumerDeleteRequest.Release(), 0, GetPipeConfigWithRetries()); + consumerDeleteResult = tc.Runtime->GrabEdgeEvent(handle); + { + //Cerr << "Got consumer delete response: " << consumerDeleteResult->Record << Endl; + UNIT_ASSERT(consumerDeleteResult->Record.HasStatus()); + UNIT_ASSERT_EQUAL(consumerDeleteResult->Record.GetStatus(), NKikimrPQ::EStatus::OK); + } + + // Resend intercepted blob responses and wait for read result + captureBlobResponsesObserver.Remove(); + for (auto& ev : capturedBlobResponses) { + tc.Runtime->Send(ev.Release(), 0, true); + } + + readResult = tc.Runtime->GrabEdgeEvent(handle); + { + //Cerr << "Got read response: " << readResult->Record << Endl; + UNIT_ASSERT(readResult->Record.HasStatus()); + UNIT_ASSERT_EQUAL(readResult->Record.GetErrorCode(), NPersQueue::NErrorCode::BAD_REQUEST); + UNIT_ASSERT_STRING_CONTAINS_C(readResult->Record.GetErrorReason(), "Consumer user1 is gone from partition", readResult->Record.Utf8DebugString()); + } + }); +} } // Y_UNIT_TEST_SUITE(TPQTest) diff --git a/ydb/core/protos/counters_hive.proto b/ydb/core/protos/counters_hive.proto index c07de897e559..7a39f2b3f730 100644 --- a/ydb/core/protos/counters_hive.proto +++ b/ydb/core/protos/counters_hive.proto @@ -29,7 +29,7 @@ enum ESimpleCounters { COUNTER_IMBALANCED_OBJECTS = 19 [(CounterOpts) = {Name: "ImbalancedObjects"}]; COUNTER_WORST_OBJECT_VARIANCE = 20 [(CounterOpts) = {Name: "WorstObjectVariance"}]; COUNTER_STORAGE_SCATTER = 21 [(CounterOpts) = {Name: "StorageScatter"}]; - RESERVED22 = 22; + COUNTER_TABLETS_STARTING = 22 [(CounterOpts) = {Name: "TabletsStarting"}]; COUNTER_PINGQUEUE_SIZE = 23 [(CounterOpts) = {Name: "PingQueueSize"}]; } @@ -77,6 +77,21 @@ enum EPercentileCounters { Ranges: { Value: 95 Name: "95%" }, Ranges: { Value: 100 Name: "100%" }, }]; + + COUNTER_TABLETS_START_TIME = 2 [(CounterOpts) = { + Name: "TabletsStartTimeMs", + Ranges: { Value: 1 } + Ranges: { Value: 5 } + Ranges: { Value: 10 } + Ranges: { Value: 50 } + Ranges: { Value: 100 } + Ranges: { Value: 500 } + Ranges: { Value: 1000 } + Ranges: { Value: 5000 } + Ranges: { Value: 10000 } + Ranges: { Value: 30000 } + Ranges: { Value: 60000 } + }]; } enum ETxTypes { diff --git a/ydb/core/protos/counters_replication.proto b/ydb/core/protos/counters_replication.proto index e24f15edfd1c..4c8827674efd 100644 --- a/ydb/core/protos/counters_replication.proto +++ b/ydb/core/protos/counters_replication.proto @@ -7,10 +7,17 @@ option (TabletTypeName) = "ReplicationController"; // Used as prefix for all cou enum ESimpleCounters { COUNTER_SIMPLE_IGNORE = 0; + COUNTER_SESSIONS = 1 [(CounterOpts) = {Name: "Sessions"}]; + COUNTER_WORKERS = 2 [(CounterOpts) = {Name: "Workers"}]; + COUNTER_BOOT_QUEUE = 3 [(CounterOpts) = {Name: "BootQueue"}]; + COUNTER_STOP_QUEUE = 4 [(CounterOpts) = {Name: "StopQueue"}]; + COUNTER_DATA_LAG = 5 [(CounterOpts) = {Name: "DataLag"}]; } enum ECumulativeCounters { COUNTER_CUMULATIVE_IGNORE = 0; + COUNTER_CREATE_SESSION = 1 [(CounterOpts) = {Name: "CreateSession"}]; + COUNTER_DELETE_SESSION = 2 [(CounterOpts) = {Name: "DeleteSession"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 721065cf9e58..0a72c49cc1f9 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -156,4 +156,5 @@ message TFeatureFlags { optional bool EnableOlapCompression = 142 [default = false]; optional bool EnableExternalDataSourcesOnServerless = 143 [default = true]; optional bool EnableSparsedColumns = 144 [default = false]; + optional bool EnableDriveSerialsDiscovery = 152 [default = false]; } diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index fb628b2dfa76..d5ccc68639f8 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -64,6 +64,7 @@ enum EIsolationLevel { ISOLATION_LEVEL_READ_COMMITTED = 2; ISOLATION_LEVEL_READ_UNCOMMITTED = 3; ISOLATION_LEVEL_READ_STALE = 4; + ISOLATION_LEVEL_SNAPSHOT_RO = 5; }; enum EQueryReplyFlags { diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 6be261d99bee..230deb29e0f5 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -2645,6 +2645,16 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct return; } + if (State == TShardState::PreOffline || + State == TShardState::Offline) + { + replyWithError( + Ydb::StatusIds::NOT_FOUND, + TStringBuilder() << "Shard " << TabletID() << " finished splitting/merging" + << " (node# " << SelfId().NodeId() << " state# " << DatashardStateName(State) << ")"); + return; + } + if (!IsStateNewReadAllowed()) { replyWithError( Ydb::StatusIds::OVERLOADED, diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index 1e4950575184..eaec5b5215a2 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -186,10 +186,14 @@ namespace NKqpHelpers { return KqpSimpleExec(runtime, query, true, database); } - inline TString KqpSimpleBegin(TTestActorRuntime& runtime, TString& sessionId, TString& txId, const TString& query) { + inline auto KqpSimpleBeginSend(TTestActorRuntime& runtime, TString& sessionId, const TString& query) { sessionId = CreateSessionRPC(runtime); + return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, /* txId */ {}, false /* commitTx */)); + } + + inline TString KqpSimpleBegin(TTestActorRuntime& runtime, TString& sessionId, TString& txId, const TString& query) { txId.clear(); - auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, false /* commitTx */))); + auto response = AwaitResponse(runtime, KqpSimpleBeginSend(runtime, sessionId, query)); if (response.operation().status() != Ydb::StatusIds::SUCCESS) { return TStringBuilder() << "ERROR: " << response.operation().status(); } diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 14153b3e8912..e65b10d9b119 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -3970,7 +3970,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) { Y_UNIT_TEST(HandleMvccGoneInContinue) { // TODO } -}; +} Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) { Y_UNIT_TEST(ShouldRead) { @@ -4054,7 +4054,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) { UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::UNSUPPORTED); } -}; +} Y_UNIT_TEST_SUITE(DataShardReadIteratorState) { Y_UNIT_TEST(ShouldCalculateQuota) { @@ -4105,7 +4105,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorState) { UNIT_ASSERT_VALUES_EQUAL(state.Quota.Bytes, 131729); UNIT_ASSERT(state.State == NDataShard::TReadIteratorState::EState::Executing); } -}; +} Y_UNIT_TEST_SUITE(DataShardReadIteratorPageFaults) { Y_UNIT_TEST(CancelPageFaultedReadThenDropTable) { @@ -4755,4 +4755,70 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) { } +Y_UNIT_TEST_SUITE(DataShardReadIteratorLatency) { + + Y_UNIT_TEST(ReadSplitLatency) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + TServer::TPtr server = new TServer(serverSettings); + + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + // Insert initial data + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10), (2, 20), (3, 30), (4, 40), (5, 50);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (6, 60), (7, 70), (8, 80), (9, 90), (10, 100);"); + + // Copy table (this will ensure original shards stay alive after split) + { + auto senderCopy = runtime.AllocateEdgeActor(); + ui64 txId = AsyncCreateCopyTable(server, senderCopy, "/Root", "table-2", "/Root/table-1"); + WaitTxNotification(server, senderCopy, txId); + } + + TBlockEvents blockedReads(runtime); + + Cerr << "... starting read from table-1" << Endl; + TString readSessionId; + auto readFuture = KqpSimpleBeginSend(runtime, readSessionId, R"( + SELECT * FROM `/Root/table-1` ORDER BY key; + )"); + + runtime.WaitFor("blocked TEvRead", [&]{ return blockedReads.size() >= 1; }); + + { + Cerr << "... splitting table-1" << Endl; + SetSplitMergePartCountLimit(server->GetRuntime(), -1); + auto shards1before = GetTableShards(server, sender, "/Root/table-1"); + ui64 txId = AsyncSplitTable(server, sender, "/Root/table-1", shards1before.at(0), 5); + Cerr << "... split txId# " << txId << " started" << Endl; + WaitTxNotification(server, sender, txId); + Cerr << "... split txId# " << txId << " finished" << Endl; + } + + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + + auto readStartTs = runtime.GetCurrentTime(); + blockedReads.Unblock(); + blockedReads.Stop(); + auto readResponse = runtime.WaitFuture(std::move(readFuture)); + UNIT_ASSERT_VALUES_EQUAL(readResponse.operation().status(), Ydb::StatusIds::SUCCESS); + auto readLatency = runtime.GetCurrentTime() - readStartTs; + Cerr << "... read latency was " << readLatency << Endl; + UNIT_ASSERT_C(readLatency < TDuration::MilliSeconds(100), + "unexpected read latency " << readLatency); + } + +} + } // namespace NKikimr diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index 46e8c3dfbb0c..ffd461ca6613 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace NKikimr::NReplication { @@ -13,6 +14,13 @@ TController::TController(const TActorId& tablet, TTabletStorageInfo* info) : TActor(&TThis::StateInit) , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory) , LogPrefix(this) + , TabletCountersPtr(new TProtobufTabletCounters< + ESimpleCounters_descriptor, + ECumulativeCounters_descriptor, + EPercentileCounters_descriptor, + ETxTypes_descriptor + >()) + , TabletCounters(TabletCountersPtr.Get()) { } @@ -30,6 +38,7 @@ void TController::OnTabletDead(TEvTablet::TEvTabletDead::TPtr&, const TActorCont void TController::OnActivateExecutor(const TActorContext& ctx) { CLOG_T(ctx, "OnActivateExecutor"); + Executor()->RegisterExternalTabletCounters(TabletCountersPtr.Release()); RunTxInitSchema(ctx); } @@ -292,9 +301,11 @@ void TController::Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& void TController::CreateSession(ui32 nodeId, const TActorContext& ctx) { CLOG_D(ctx, "Create session" << ": nodeId# " << nodeId); + TabletCounters->Cumulative()[COUNTER_CREATE_SESSION] += 1; Y_ABORT_UNLESS(!Sessions.contains(nodeId)); Sessions.emplace(nodeId, TSessionInfo()); + TabletCounters->Simple()[COUNTER_SESSIONS] = Sessions.size(); auto ev = MakeHolder(TabletID(), Executor()->Generation()); ui32 flags = 0; @@ -308,6 +319,7 @@ void TController::CreateSession(ui32 nodeId, const TActorContext& ctx) { void TController::DeleteSession(ui32 nodeId, const TActorContext& ctx) { CLOG_D(ctx, "Delete session" << ": nodeId# " << nodeId); + TabletCounters->Cumulative()[COUNTER_DELETE_SESSION] += 1; Y_ABORT_UNLESS(Sessions.contains(nodeId)); auto& session = Sessions[nodeId]; @@ -327,6 +339,8 @@ void TController::DeleteSession(ui32 nodeId, const TActorContext& ctx) { } Sessions.erase(nodeId); + TabletCounters->Simple()[COUNTER_SESSIONS] = Sessions.size(); + CloseSession(nodeId, ctx); ScheduleProcessQueues(); } @@ -431,6 +445,9 @@ void TController::UpdateLag(const TWorkerId& id, TDuration lag) { } target->UpdateLag(id.WorkerId(), lag); + if (const auto lag = replication->GetLag()) { + TabletCounters->Simple()[COUNTER_DATA_LAG] = lag->MilliSeconds(); + } } void TController::Handle(TEvService::TEvRunWorker::TPtr& ev, const TActorContext& ctx) { @@ -486,6 +503,7 @@ TWorkerInfo* TController::GetOrCreateWorker(const TWorkerId& id, NKikimrReplicat auto it = Workers.find(id); if (it == Workers.end()) { it = Workers.emplace(id, cmd).first; + TabletCounters->Simple()[COUNTER_WORKERS] = Workers.size(); } auto replication = Find(id.ReplicationId()); @@ -499,6 +517,9 @@ TWorkerInfo* TController::GetOrCreateWorker(const TWorkerId& id, NKikimrReplicat } void TController::ScheduleProcessQueues() { + TabletCounters->Simple()[COUNTER_BOOT_QUEUE] = BootQueue.size(); + TabletCounters->Simple()[COUNTER_STOP_QUEUE] = StopQueue.size(); + if (ProcessQueuesScheduled || (!BootQueue && !StopQueue)) { return; } @@ -652,6 +673,7 @@ void TController::RemoveWorker(const TWorkerId& id, const TActorContext& ctx) { RemoveQueue.erase(id); Workers.erase(id); + TabletCounters->Simple()[COUNTER_WORKERS] = Workers.size(); auto replication = Find(id.ReplicationId()); if (!replication) { diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h index b7a365501a46..d6791b3ed303 100644 --- a/ydb/core/tx/replication/controller/controller_impl.h +++ b/ydb/core/tx/replication/controller/controller_impl.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -167,6 +168,8 @@ class TController private: const TTabletLogPrefix LogPrefix; + THolder TabletCountersPtr; + TTabletCountersBase* TabletCounters; TSysParams SysParams; THashMap Replications; diff --git a/ydb/core/tx/replication/controller/ya.make b/ydb/core/tx/replication/controller/ya.make index bc742cbefccc..d779aa7fa500 100644 --- a/ydb/core/tx/replication/controller/ya.make +++ b/ydb/core/tx/replication/controller/ya.make @@ -5,6 +5,7 @@ PEERDIR( ydb/core/discovery ydb/core/engine/minikql ydb/core/protos + ydb/core/tablet ydb/core/tablet_flat ydb/core/tx/replication/common ydb/core/tx/replication/ydb_proxy diff --git a/ydb/core/viewer/viewer.cpp b/ydb/core/viewer/viewer.cpp index b32cc5c0eaad..a9a60a8840fd 100644 --- a/ydb/core/viewer/viewer.cpp +++ b/ydb/core/viewer/viewer.cpp @@ -1,5 +1,6 @@ #include "viewer.h" #include "counters_hosts.h" +#include "viewer_healthcheck.h" #include "json_handlers.h" #include "log.h" #include "viewer_request.h" @@ -522,6 +523,10 @@ class TViewer : public TActorBootstrapped, public IViewer { ctx.ExecutorThread.RegisterActor(new TCountersHostsList(this, ev)); return; } + if (path.StartsWith("/healthcheck")) { // healthcheck no auth scrapping + ctx.ExecutorThread.RegisterActor(new TJsonHealthCheck(this, ev)); + return; + } // TODO: check path validity // TODO: cache if (msg->Request.GetPathInfo().StartsWith('/')) { diff --git a/ydb/core/viewer/viewer_healthcheck.h b/ydb/core/viewer/viewer_healthcheck.h index 054c94cc41cd..62e5ea263ef3 100644 --- a/ydb/core/viewer/viewer_healthcheck.h +++ b/ydb/core/viewer/viewer_healthcheck.h @@ -196,8 +196,7 @@ class TJsonHealthCheck : public TViewerPipeClient { return TBase::ReplyAndPassAway(GetHTTPINTERNALERROR("text/plain", "No result")); } else { if (Format == HealthCheckResponseFormat::PROMETHEUS) { - HandlePrometheus(); - return PassAway(); + return HandlePrometheus(); } else { TStringStream json; TProtoToJson::ProtoToJson(json, *Result, JsonSettings); diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp index 8a4c9e70cece..1bd38fe24b73 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp @@ -77,7 +77,7 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl const auto& entry = it->second.Entry; - Y_ENSURE(entry->Pattern->IsCompiled()); + Y_ASSERT(entry->Pattern->IsCompiled()); if (it->second.LinkedInCompiledPatternLRUList()) { return; diff --git a/ydb/library/yql/public/decimal/ut/yql_decimal_ut.cpp b/ydb/library/yql/public/decimal/ut/yql_decimal_ut.cpp index 9a79bbb53d65..a179faa28ed7 100644 --- a/ydb/library/yql/public/decimal/ut/yql_decimal_ut.cpp +++ b/ydb/library/yql/public/decimal/ut/yql_decimal_ut.cpp @@ -215,6 +215,8 @@ Y_UNIT_TEST_SUITE(TYqlDecimalTest) { UNIT_ASSERT(FromStringEx("-1e-99", 10, 2) == 0); UNIT_ASSERT(FromStringEx("-510e-3", 1, 0) == -1); UNIT_ASSERT(FromStringEx("+99E3", 5, 0) == 99000); + UNIT_ASSERT(FromStringEx("2.1E-130", 35, 2) == 0); + UNIT_ASSERT(FromStringEx("2.1E0", 35, 2) == 210); } Y_UNIT_TEST(TestFormStringExInvalidValues) { @@ -225,8 +227,11 @@ Y_UNIT_TEST_SUITE(TYqlDecimalTest) { UNIT_ASSERT(IsError(FromStringEx("E2", 35, 15))); // empty UNIT_ASSERT(IsError(FromStringEx("E2E4", 35, 15))); // empty - UNIT_ASSERT(IsError(FromStringEx("12E0", 35, 15))); // zero isn't avail UNIT_ASSERT(IsError(FromStringEx("NANE5", 35, 15))); // nan with exp + UNIT_ASSERT(IsError(FromStringEx("infE5", 35, 15))); // inf with exp + UNIT_ASSERT(IsError(FromStringEx("-infe-5", 35, 15))); // inf with exp + UNIT_ASSERT(IsError(FromStringEx("2.1E0X", 35, 2))); // not fully parsed exp + UNIT_ASSERT(IsError(FromStringEx("2.1E+-1", 35, 2))); // two signs } Y_UNIT_TEST(TestSpecialAsString) { diff --git a/ydb/library/yql/public/decimal/yql_decimal.cpp b/ydb/library/yql/public/decimal/yql_decimal.cpp index 89fc92cd49be..163617b39673 100644 --- a/ydb/library/yql/public/decimal/yql_decimal.cpp +++ b/ydb/library/yql/public/decimal/yql_decimal.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace NYql { namespace NDecimal { @@ -10,6 +11,10 @@ namespace NDecimal { static const TUint128 Ten(10U); TUint128 GetDivider(ui8 scale) { + if (scale > MaxPrecision) { + return Inf(); + } + TUint128 d(1U); while (scale--) d *= Ten; @@ -217,8 +222,16 @@ TInt128 FromStringEx(const TStringBuf& str, ui8 precision, ui8 scale) { if (!len) return Err(); - const auto exp = std::atoi(++ptr); - if (!exp) + ++ptr; + if (ptr != s + str.size() && *ptr == '+') { + ++ptr; + if (ptr != s + str.size() && *ptr == '-') + return Err(); + } + + int exp; + auto [finish, ec] = std::from_chars(ptr, s + str.size(), exp); + if (ec != std::errc() || finish != s + str.size()) return Err(); const int p = precision, s = int(scale) + exp; @@ -231,8 +244,19 @@ TInt128 FromStringEx(const TStringBuf& str, ui8 precision, ui8 scale) { return Err(); } + if (IsInf(r)) { + auto p = str.data(); + if (*p == '+' || *p == '-') + ++p; + + if (!std::isdigit(*p)) + return Err(); + + return r; + } + if (const auto e = exp > 0 ? std::max(0, s - p) : std::min(0, s)) { - if (r && IsNormal(r)) { + if (r) { if (exp > 0) return Mul(r, GetDivider(+e)); if (exp < 0) diff --git a/ydb/library/yql/sql/v1/SQLv1.g.in b/ydb/library/yql/sql/v1/SQLv1.g.in index 76b691b81e56..f92b02c8babd 100644 --- a/ydb/library/yql/sql/v1/SQLv1.g.in +++ b/ydb/library/yql/sql/v1/SQLv1.g.in @@ -596,12 +596,12 @@ alter_external_data_source_action: drop_external_data_source_stmt: DROP EXTERNAL DATA SOURCE (IF EXISTS)? object_ref; -create_view_stmt: CREATE VIEW object_ref +create_view_stmt: CREATE VIEW (IF NOT EXISTS)? object_ref create_object_features? AS select_stmt ; -drop_view_stmt: DROP VIEW object_ref; +drop_view_stmt: DROP VIEW (IF EXISTS)? object_ref; upsert_object_stmt: UPSERT OBJECT object_ref LPAREN TYPE object_type_ref RPAREN diff --git a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp index 6f589f325a12..8d7c91805880 100644 --- a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp +++ b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp @@ -1579,9 +1579,16 @@ FROM Input MATCH_RECOGNIZE (PATTERN (A) DEFINE A AS A); } Y_UNIT_TEST(CreateView) { - TCases cases = { - {"creAte vIEw TheView wiTh (security_invoker = trUE) As SELect 1", - "CREATE VIEW TheView WITH (security_invoker = TRUE) AS\nSELECT\n\t1;\n"}, + TCases cases = {{ + "creAte vIEw TheView As SELect 1", + "CREATE VIEW TheView AS\nSELECT\n\t1;\n" + }, { + "creAte vIEw If Not ExIsTs TheView As SELect 1", + "CREATE VIEW IF NOT EXISTS TheView AS\nSELECT\n\t1;\n" + }, { + "creAte vIEw TheView wiTh (option = tRuE) As SELect 1", + "CREATE VIEW TheView WITH (option = TRUE) AS\nSELECT\n\t1;\n" + } }; TSetup setup; @@ -1589,9 +1596,13 @@ FROM Input MATCH_RECOGNIZE (PATTERN (A) DEFINE A AS A); } Y_UNIT_TEST(DropView) { - TCases cases = { - {"dRop viEW theVIEW", - "DROP VIEW theVIEW;\n"}, + TCases cases = {{ + "dRop viEW theVIEW", + "DROP VIEW theVIEW;\n" + }, { + "dRop viEW iF EXistS theVIEW", + "DROP VIEW IF EXISTS theVIEW;\n" + } }; TSetup setup; diff --git a/ydb/library/yql/sql/v1/sql_query.cpp b/ydb/library/yql/sql/v1/sql_query.cpp index f2c0a9e26e04..40c1b6727204 100644 --- a/ydb/library/yql/sql/v1/sql_query.cpp +++ b/ydb/library/yql/sql/v1/sql_query.cpp @@ -1220,11 +1220,11 @@ bool TSqlQuery::Statement(TVector& blocks, const TRule_sql_stmt_core& break; } case TRule_sql_stmt_core::kAltSqlStmtCore42: { - // create_view_stmt: CREATE VIEW name WITH (k = v, ...) AS select_stmt; + // create_view_stmt: CREATE VIEW (IF NOT EXISTS)? name (WITH (k = v, ...))? AS select_stmt; auto& node = core.GetAlt_sql_stmt_core42().GetRule_create_view_stmt1(); TObjectOperatorContext context(Ctx.Scoped); - if (node.GetRule_object_ref3().HasBlock1()) { - if (!ClusterExpr(node.GetRule_object_ref3().GetBlock1().GetRule_cluster_expr1(), + if (node.GetRule_object_ref4().HasBlock1()) { + if (!ClusterExpr(node.GetRule_object_ref4().GetBlock1().GetRule_cluster_expr1(), false, context.ServiceId, context.Cluster)) { @@ -1232,34 +1232,36 @@ bool TSqlQuery::Statement(TVector& blocks, const TRule_sql_stmt_core& } } + const bool existingOk = node.HasBlock3(); + std::map features; - if (node.HasBlock4()) { - if (!ParseObjectFeatures(features, node.GetBlock4().GetRule_create_object_features1().GetRule_object_features2())) { + if (node.HasBlock5()) { + if (!ParseObjectFeatures(features, node.GetBlock5().GetRule_create_object_features1().GetRule_object_features2())) { return false; } } - if (!ParseViewQuery(features, node.GetRule_select_stmt6())) { + if (!ParseViewQuery(features, node.GetRule_select_stmt7())) { return false; } - const TString objectId = Id(node.GetRule_object_ref3().GetRule_id_or_at2(), *this).second; + const TString objectId = Id(node.GetRule_object_ref4().GetRule_id_or_at2(), *this).second; constexpr const char* TypeId = "VIEW"; AddStatementToBlocks(blocks, BuildCreateObjectOperation(Ctx.Pos(), BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), objectId), TypeId, - false, + existingOk, false, std::move(features), context)); break; } case TRule_sql_stmt_core::kAltSqlStmtCore43: { - // drop_view_stmt: DROP VIEW name; + // drop_view_stmt: DROP VIEW (IF EXISTS)? name; auto& node = core.GetAlt_sql_stmt_core43().GetRule_drop_view_stmt1(); TObjectOperatorContext context(Ctx.Scoped); - if (node.GetRule_object_ref3().HasBlock1()) { - if (!ClusterExpr(node.GetRule_object_ref3().GetBlock1().GetRule_cluster_expr1(), + if (node.GetRule_object_ref4().HasBlock1()) { + if (!ClusterExpr(node.GetRule_object_ref4().GetBlock1().GetRule_cluster_expr1(), false, context.ServiceId, context.Cluster)) { @@ -1267,13 +1269,15 @@ bool TSqlQuery::Statement(TVector& blocks, const TRule_sql_stmt_core& } } - const TString objectId = Id(node.GetRule_object_ref3().GetRule_id_or_at2(), *this).second; + const bool missingOk = node.HasBlock3(); + + const TString objectId = Id(node.GetRule_object_ref4().GetRule_id_or_at2(), *this).second; constexpr const char* TypeId = "VIEW"; AddStatementToBlocks(blocks, BuildDropObjectOperation(Ctx.Pos(), BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), objectId), TypeId, - false, + missingOk, {}, context)); break; diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp index 5fda9c17726d..1570fabfc4b7 100644 --- a/ydb/library/yql/sql/v1/sql_ut.cpp +++ b/ydb/library/yql/sql/v1/sql_ut.cpp @@ -2691,7 +2691,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { auto req = Sprintf(reqTpl, key.c_str(), value.c_str()); auto res = SqlToYql(req); UNIT_ASSERT(res.Root); - + TVerifyLineFunc verifyLine = [&key, &value](const TString& word, const TString& line) { if (word == "Write") { UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication")); @@ -6592,6 +6592,28 @@ Y_UNIT_TEST_SUITE(TViewSyntaxTest) { UNIT_ASSERT_C(res.Root, res.Issues.ToString()); } + Y_UNIT_TEST(CreateViewIfNotExists) { + constexpr const char* name = "TheView"; + NYql::TAstParseResult res = SqlToYql(std::format(R"( + USE plato; + CREATE VIEW IF NOT EXISTS {} WITH (security_invoker = TRUE) AS SELECT 1; + )", name + )); + UNIT_ASSERT_C(res.Root, res.Issues.ToString()); + + TVerifyLineFunc verifyLine = [&](const TString& word, const TString& line) { + if (word == "Write!") { + UNIT_ASSERT_STRING_CONTAINS(line, name); + UNIT_ASSERT_STRING_CONTAINS(line, "createObjectIfNotExists"); + } + }; + + TWordCountHive elementStat = { {"Write!"} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(elementStat["Write!"], 1); + } + Y_UNIT_TEST(CreateViewFromTable) { constexpr const char* path = "/PathPrefix/TheView"; constexpr const char* query = R"( @@ -6671,6 +6693,28 @@ Y_UNIT_TEST_SUITE(TViewSyntaxTest) { UNIT_ASSERT_VALUES_EQUAL(elementStat["Write!"], 1); } + Y_UNIT_TEST(DropViewIfExists) { + constexpr const char* name = "TheView"; + NYql::TAstParseResult res = SqlToYql(std::format(R"( + USE plato; + DROP VIEW IF EXISTS {}; + )", name + )); + UNIT_ASSERT_C(res.Root, res.Issues.ToString()); + + TVerifyLineFunc verifyLine = [&](const TString& word, const TString& line) { + if (word == "Write!") { + UNIT_ASSERT_STRING_CONTAINS(line, name); + UNIT_ASSERT_STRING_CONTAINS(line, "dropObjectIfExists"); + } + }; + + TWordCountHive elementStat = { {"Write!"} }; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(elementStat["Write!"], 1); + } + Y_UNIT_TEST(CreateViewWithTablePrefix) { NYql::TAstParseResult res = SqlToYql(R"( USE plato; @@ -6714,7 +6758,7 @@ Y_UNIT_TEST_SUITE(TViewSyntaxTest) { UNIT_ASSERT_VALUES_EQUAL(elementStat["Write!"], 1); } - + Y_UNIT_TEST(YtAlternativeSchemaSyntax) { NYql::TAstParseResult res = SqlToYql(R"( SELECT * FROM plato.Input WITH schema(y Int32, x String not null); @@ -6783,7 +6827,7 @@ Y_UNIT_TEST_SUITE(CompactNamedExprs) { pragma CompactNamedExprs; pragma ValidateUnusedExprs; - define subquery $x() as + define subquery $x() as select count(1, 2); end define; select 1; @@ -6806,7 +6850,7 @@ Y_UNIT_TEST_SUITE(CompactNamedExprs) { pragma CompactNamedExprs; pragma DisableValidateUnusedExprs; - define subquery $x() as + define subquery $x() as select count(1, 2); end define; select 1; diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp index fdd793b30f11..771fd1d7c971 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp @@ -843,7 +843,7 @@ void TReadSessionActor::SetupBytesReadByUserAgentCounter() { ->GetSubgroup("host", "") ->GetSubgroup("protocol", "pqv0") ->GetSubgroup("consumer", ClientPath) - ->GetSubgroup("user_agent", V1::CleanupCounterValueString(UserAgent)) + ->GetSubgroup("user_agent", V1::DropUserAgentSuffix(V1::CleanupCounterValueString(UserAgent))) ->GetExpiringNamedCounter("sensor", "BytesReadByUserAgent", true); } diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp index ccdef2238730..5c2b16387390 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp @@ -262,7 +262,7 @@ void TWriteSessionActor::SetupBytesWrittenByUserAgentCounter() { ->GetSubgroup("host", "") ->GetSubgroup("protocol", "pqv0") ->GetSubgroup("topic", FullConverter->GetFederationPath()) - ->GetSubgroup("user_agent", V1::CleanupCounterValueString(UserAgent)) + ->GetSubgroup("user_agent", V1::DropUserAgentSuffix(V1::CleanupCounterValueString(UserAgent))) ->GetExpiringNamedCounter("sensor", "BytesWrittenByUserAgent", true); } diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h index 0c1f2f79ed1f..d1282a7fbacd 100644 --- a/ydb/services/lib/actors/pq_schema_actor.h +++ b/ydb/services/lib/actors/pq_schema_actor.h @@ -544,6 +544,10 @@ namespace NKikimr::NGRpcProxy::V1 { return path; } + const TMaybe& GetCdcStreamName() const { + return CdcStreamName; + } + void SendDescribeProposeRequest(bool showPrivate = false) { return TBase::SendDescribeProposeRequest(this->ActorContext(), showPrivate); } @@ -599,6 +603,10 @@ namespace NKikimr::NGRpcProxy::V1 { if (static_cast(this)->IsCdcStreamCompatible()) { Y_ABORT_UNLESS(response.ListNodeEntry->Children.size() == 1); PrivateTopicName = response.ListNodeEntry->Children.at(0).Name; + + if (response.Self) { + CdcStreamName = response.Self->Info.GetName(); + } SendDescribeProposeRequest(true); return true; } @@ -616,6 +624,8 @@ namespace NKikimr::NGRpcProxy::V1 { TIntrusiveConstPtr PQGroupInfo; TIntrusiveConstPtr Self; TMaybe PrivateTopicName; + TMaybe CdcStreamName; + }; } diff --git a/ydb/services/persqueue_v1/actors/helpers.cpp b/ydb/services/persqueue_v1/actors/helpers.cpp index dccd2f9e717a..eec9fe768007 100644 --- a/ydb/services/persqueue_v1/actors/helpers.cpp +++ b/ydb/services/persqueue_v1/actors/helpers.cpp @@ -29,7 +29,7 @@ bool HasMessages(const Topic::StreamReadMessage::ReadResponse& data) { TString CleanupCounterValueString(const TString& value) { - // Internal Monitoring system requires metrics values to no longer than 200 characters + // Internal Monitoring system requires metrics values to be no longer than 200 characters // and prohibits some ASCII characters. TString clean; @@ -55,4 +55,15 @@ TString CleanupCounterValueString(const TString& value) { return clean; } + +TString DropUserAgentSuffix(const TString& userAgent) { + auto ua = TStringBuf(userAgent); + TStringBuf beforeParen, afterParen; + ua.Split('(', beforeParen, afterParen); + while (beforeParen.ends_with(' ')) { + beforeParen.Chop(1); + } + return TString(beforeParen); +} + } diff --git a/ydb/services/persqueue_v1/actors/helpers.h b/ydb/services/persqueue_v1/actors/helpers.h index a2e017fc8a25..bcf3bd2e0b8a 100644 --- a/ydb/services/persqueue_v1/actors/helpers.h +++ b/ydb/services/persqueue_v1/actors/helpers.h @@ -18,5 +18,6 @@ bool HasMessages(const PersQueue::V1::MigrationStreamingReadServerMessage::DataB bool HasMessages(const Topic::StreamReadMessage::ReadResponse& data); TString CleanupCounterValueString(const TString& value); +TString DropUserAgentSuffix(const TString& userAgent); } diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp index 91b132470b29..09880d47f54e 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp @@ -899,7 +899,7 @@ void TReadSessionActor::SetupBytesReadByUserAgentCounter() ->GetSubgroup("protocol", protocol) ->GetSubgroup("consumer", ClientPath) ->GetSubgroup("sdk_build_info", CleanupCounterValueString(SdkBuildInfo)) - ->GetSubgroup("user_agent", CleanupCounterValueString(UserAgent)) + ->GetSubgroup("user_agent", DropUserAgentSuffix(CleanupCounterValueString(UserAgent))) ->GetExpiringNamedCounter("sensor", "BytesReadByUserAgent", true); } diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 63749dfc71a3..85fe02794216 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1587,7 +1587,18 @@ void TAlterTopicActorInternal::HandleCacheNavigateResponse(TEvTxProxySchemeCache } TUpdateSchemeBase::HandleCacheNavigateResponse(ev); auto& schemeTx = Response->Response.ModifyScheme; - FillModifyScheme(schemeTx, ActorContext(), GetRequest().WorkingDir, GetRequest().Name); + std::pair pathPair; + try { + pathPair = NKikimr::NGRpcService::SplitPath(GetTopicPath()); + } catch (const std::exception &ex) { + Response->Response.Issues.AddIssue(NYql::ExceptionToIssue(ex)); + RespondWithCode(Ydb::StatusIds::BAD_REQUEST); + return; + } + + const auto& workingDir = pathPair.first; + const auto& name = pathPair.second; + FillModifyScheme(schemeTx, ActorContext(), workingDir, name); } void TAlterTopicActorInternal::ModifyPersqueueConfig( @@ -1601,7 +1612,7 @@ void TAlterTopicActorInternal::ModifyPersqueueConfig( TString error; Y_UNUSED(selfInfo); - auto status = FillProposeRequestImpl(GetRequest().Request, groupConfig, appData, error, false); + auto status = FillProposeRequestImpl(GetRequest().Request, groupConfig, appData, error, GetCdcStreamName().Defined()); if (!error.empty()) { Response->Response.Issues.AddIssue(error); } diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.cpp b/ydb/services/persqueue_v1/actors/write_session_actor.cpp index 6b414c0c551c..fa2722931405 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.cpp @@ -502,7 +502,7 @@ void TWriteSessionActor::SetupBytesWrittenByUserAgentCount ->GetSubgroup("protocol", protocol) ->GetSubgroup("topic", topicPath) ->GetSubgroup("sdk_build_info", CleanupCounterValueString(SdkBuildInfo)) - ->GetSubgroup("user_agent", CleanupCounterValueString(UserAgent)) + ->GetSubgroup("user_agent", DropUserAgentSuffix(CleanupCounterValueString(UserAgent))) ->GetExpiringNamedCounter("sensor", "BytesWrittenByUserAgent", true); } diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp index c6459123ee15..b1125622cb94 100644 --- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp @@ -425,7 +425,7 @@ namespace NKikimr::NPersQueueTests { } else { UNIT_FAIL("Neither topic nor consumer were provided"); } - UNIT_ASSERT_VALUES_EQUAL(labels["user_agent"].GetString(), NGRpcProxy::V1::CleanupCounterValueString(userAgent)); + UNIT_ASSERT_VALUES_EQUAL(labels["user_agent"].GetString(), NGRpcProxy::V1::DropUserAgentSuffix(NGRpcProxy::V1::CleanupCounterValueString(userAgent))); } }; @@ -439,7 +439,7 @@ namespace NKikimr::NPersQueueTests { UNIT_ASSERT(result.IsSuccess()); } - static constexpr auto userAgent = "test-client/v0.1 ' ?*'\"`| "; + static constexpr auto userAgent = "test-client/v0.1 ' ?*'\"`| (some build info (codename); os 1.0)"; { auto newDriverCfg = driverCfg; @@ -624,7 +624,7 @@ namespace NKikimr::NPersQueueTests { } else { UNIT_FAIL("Neither topic nor consumer were provided"); } - UNIT_ASSERT_VALUES_EQUAL(labels["user_agent"].GetString(), NGRpcProxy::V1::CleanupCounterValueString(userAgent)); + UNIT_ASSERT_VALUES_EQUAL(labels["user_agent"].GetString(), NGRpcProxy::V1::DropUserAgentSuffix(NGRpcProxy::V1::CleanupCounterValueString(userAgent))); } }; @@ -638,7 +638,7 @@ namespace NKikimr::NPersQueueTests { UNIT_ASSERT(result.IsSuccess()); } - static constexpr auto userAgent = "test-client/v0.1 ' ?*'\"`| "; + static constexpr auto userAgent = "test-client/v0.1 ' ?*'\"`| (some build info (codename); os 1.0)"; { auto newDriverCfg = driverCfg; diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 293ff9aa6edb..d4e06221a56f 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -3676,7 +3676,8 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; } else { UNIT_FAIL("Neither topic nor consumer were provided"); } - UNIT_ASSERT_VALUES_EQUAL(labels["user_agent"].GetString(), NGRpcProxy::V1::CleanupCounterValueString(userAgent)); + UNIT_ASSERT_VALUES_EQUAL(labels["user_agent"].GetString(), "test-client/v0.1"); + UNIT_ASSERT_VALUES_EQUAL(labels["user_agent"].GetString(), NGRpcProxy::V1::DropUserAgentSuffix(NGRpcProxy::V1::CleanupCounterValueString(userAgent))); } }; @@ -3711,7 +3712,7 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; auto driver = server.AnnoyingClient->GetDriver(); - static constexpr auto userAgent = "test-client/v0.1 ' ?*'\"`| "; + static constexpr auto userAgent = "test-client/v0.1 ' ?*'\"`| (some build info (codename); os 1.0)"; auto writer = CreateWriter( *driver, @@ -3865,8 +3866,7 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; "", "Dc1", consumerName, consumerPath ); - checkUserAgentCounters(server.CleverServer->GetRuntime()->GetMonPort(), - "BytesReadByUserAgent", "pqv1", userAgent, "", consumerPath); + checkUserAgentCounters(monPort, "BytesReadByUserAgent", "pqv1", userAgent, "", consumerPath); } };