diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp index f3d20a068cb8..56cd85586d8d 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp @@ -218,6 +218,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrappedRecord.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); req->Record.MutableRequest()->SetKeepSession(false); req->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig)); + req->Record.MutableRequest()->SetUsePublicResponseDataFormat(true); req->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); req->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); @@ -274,9 +275,14 @@ class TPersQueueMetaCacheActor : public TActorBootstrappedGet()->Record.GetRef(); - Y_ABORT_UNLESS(record.GetResponse().GetResults().size() == 1); - const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0); - ui64 newVersion = rr.ListSize() == 0 ? 0 : rr.GetList(0).GetStruct(0).GetOptional().GetInt64(); + Y_VERIFY(record.GetResponse().YdbResultsSize() == 1); + NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0)); + + ui64 newVersion = 0; + if (parser.RowsCount() != 0) { + parser.TryNextRow(); + newVersion = *parser.ColumnParser(0).GetOptionalInt64(); + } LastVersionUpdate = ctx.Now(); if (newVersion > CurrentTopicsVersion || CurrentTopicsVersion == 0 || SkipVersionCheck) { @@ -293,17 +299,18 @@ class TPersQueueMetaCacheActor : public TActorBootstrappedGet()->Record.GetRef(); - Y_ABORT_UNLESS(record.GetResponse().GetResults().size() == 1); + Y_VERIFY(record.GetResponse().YdbResultsSize() == 1); TString path, dc; - const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0); - for (const auto& row : rr.GetList()) { - - path = row.GetStruct(0).GetOptional().GetText(); - dc = row.GetStruct(1).GetOptional().GetText(); + NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0)); + const ui32 rowCount = parser.RowsCount(); + while (parser.TryNextRow()) { + path = *parser.ColumnParser(0).GetOptionalUtf8(); + dc = *parser.ColumnParser(1).GetOptionalUtf8(); NewTopics.emplace_back(decltype(NewTopics)::value_type{path, dc}); } - if (rr.ListSize() > 0) { + + if (rowCount > 0) { LastTopicKey = {path, dc}; return RunQuery(EQueryType::EGetTopics, ctx); } else { @@ -710,7 +717,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped); + DynamicNodesMapping.reset(new THashMap); } while(!NodesMappingWaiters.empty()) { ctx.Send(NodesMappingWaiters.front(), diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index d8827d029dd0..5e7b9949456a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -325,7 +325,7 @@ namespace { createSequenceSettings.Name = TString(createSequence.Sequence()); createSequenceSettings.Temporary = TString(createSequence.Temporary()) == "true" ? true : false; createSequenceSettings.SequenceSettings = ParseSequenceSettings(createSequence.SequenceSettings()); - createSequenceSettings.SequenceSettings.DataType = TString(createSequence.ValueType()); + createSequenceSettings.SequenceSettings.DataType = TString(createSequence.ValueType()); return createSequenceSettings; } @@ -1050,7 +1050,7 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformerResults.size()); + Y_ABORT_UNLESS(resultIndex < runResult->Results.size()); auto resultValue = runResult->Results[resultIndex]; YQL_ENSURE(resultValue); @@ -1744,9 +1744,9 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformerName, indexIter->Type, indexName); if (indexTablePaths.size() != 1) { ctx.AddError( @@ -2413,7 +2413,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformerAnalyze(cluster, analyzeSettings); - + return WrapFuture(future, [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) { Y_UNUSED(res); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index c97381ba82e4..6831cb70c0ec 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1892,10 +1892,10 @@ class TKqpSessionActor : public TActorBootstrapped { AddTrailingInfo(response->Record.GetRef()); NDataIntegrity::LogIntegrityTrails( - request->Get()->GetTraceId(), - request->Get()->GetAction(), - request->Get()->GetType(), - response, + request->Get()->GetTraceId(), + request->Get()->GetAction(), + request->Get()->GetType(), + response, TlsActivationContext->AsActorContext() ); @@ -1955,7 +1955,7 @@ class TKqpSessionActor : public TActorBootstrapped { QueryState->UserRequestContext->TraceId, QueryState->GetAction(), QueryState->GetType(), - QueryResponse, + QueryResponse, TlsActivationContext->AsActorContext() ); diff --git a/ydb/core/persqueue/cluster_tracker.cpp b/ydb/core/persqueue/cluster_tracker.cpp index 4b1d757b1fca..2b0d15f693e5 100644 --- a/ydb/core/persqueue/cluster_tracker.cpp +++ b/ydb/core/persqueue/cluster_tracker.cpp @@ -13,6 +13,8 @@ #include #include +#include + namespace NKikimr::NPQ::NClusterTracker { inline auto& Ctx() { @@ -132,6 +134,7 @@ class TClusterTracker: public TActorBootstrapped { req->Record.MutableRequest()->SetKeepSession(false); req->Record.MutableRequest()->SetQuery(MakeListClustersQuery()); req->Record.MutableRequest()->SetDatabase(GetDatabase()); + req->Record.MutableRequest()->SetUsePublicResponseDataFormat(true); // useless without explicit session // req->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); req->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); @@ -144,45 +147,52 @@ class TClusterTracker: public TActorBootstrapped { LOG_DEBUG_S(Ctx(), NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, "HandleWhileWorking TEvQueryResponse"); const auto& record = ev->Get()->Record.GetRef(); - if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS && record.GetResponse().GetResults(0).GetValue().GetStruct(0).ListSize()) { - LOG_DEBUG_S(Ctx(), NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, "HandleWhileWorking TEvQueryResponse UpdateClustersList"); - UpdateClustersList(record); + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0)); + if (parser.RowsCount()) { + LOG_DEBUG_S(Ctx(), NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, "HandleWhileWorking TEvQueryResponse UpdateClustersList"); + UpdateClustersList(parser); - Y_ABORT_UNLESS(ClustersList); - Y_ABORT_UNLESS(ClustersList->Clusters.size()); - Y_ABORT_UNLESS(ClustersListUpdateTimestamp && *ClustersListUpdateTimestamp); + Y_ABORT_UNLESS(ClustersList); + Y_ABORT_UNLESS(ClustersList->Clusters.size()); + Y_ABORT_UNLESS(ClustersListUpdateTimestamp && *ClustersListUpdateTimestamp); - BroadcastClustersUpdate(); + BroadcastClustersUpdate(); - Schedule(TDuration::Seconds(Cfg().GetClustersUpdateTimeoutSec()), new TEvents::TEvWakeup); - } else { - LOG_ERROR_S(Ctx(), NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, "failed to list clusters: " << record); + Schedule(TDuration::Seconds(Cfg().GetClustersUpdateTimeoutSec()), new TEvents::TEvWakeup); + return; + } + } - ClustersList = nullptr; + LOG_ERROR_S(Ctx(), NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, "failed to list clusters: " << record); - Schedule(TDuration::Seconds(Cfg().GetClustersUpdateTimeoutOnErrorSec()), new TEvents::TEvWakeup); - } + ClustersList = nullptr; + Schedule(TDuration::Seconds(Cfg().GetClustersUpdateTimeoutOnErrorSec()), new TEvents::TEvWakeup); } template - void UpdateClustersList(const TProtoRecord& record) { + void UpdateClustersList(TProtoRecord& parser) { auto clustersList = MakeIntrusive(); - auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0); - clustersList->Clusters.resize(t.ListSize()); + clustersList->Clusters.resize(parser.RowsCount()); - for (size_t i = 0; i < t.ListSize(); ++i) { + bool firstRow = parser.TryNextRow(); + YQL_ENSURE(firstRow); + clustersList->Version = *parser.ColumnParser(5).GetOptionalInt64(); + size_t i = 0; + + do { auto& cluster = clustersList->Clusters[i]; - cluster.Name = t.GetList(i).GetStruct(0).GetOptional().GetText(); + cluster.Name = *parser.ColumnParser(0).GetOptionalUtf8(); cluster.Datacenter = cluster.Name; - cluster.Balancer = t.GetList(i).GetStruct(1).GetOptional().GetText(); + cluster.Balancer = *parser.ColumnParser(1).GetOptionalUtf8(); - cluster.IsLocal = t.GetList(i).GetStruct(2).GetOptional().GetBool(); - cluster.IsEnabled = t.GetList(i).GetStruct(3).GetOptional().GetBool(); - cluster.Weight = t.GetList(i).GetStruct(4).GetOptional().GetUint64(); - } + cluster.IsLocal = *parser.ColumnParser(2).GetOptionalBool(); + cluster.IsEnabled = *parser.ColumnParser(3).GetOptionalBool(); + cluster.Weight = *parser.ColumnParser(4).GetOptionalUint64(); - clustersList->Version = t.GetList(0).GetStruct(5).GetOptional().GetInt64(); + ++i; + } while (parser.TryNextRow()); ClustersList = std::move(clustersList); ClustersListUpdateTimestamp = Ctx().Now(); diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h b/ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h index 0268b7d88e67..c7e52fe92676 100644 --- a/ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h +++ b/ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h @@ -10,6 +10,8 @@ #include #include +#include + namespace NKikimr::NPQ::NPartitionChooser { @@ -137,6 +139,7 @@ class TTableHelper { ev->Record.MutableRequest()->SetSessionId(KqpSessionId); ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false); ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + ev->Record.MutableRequest()->SetUsePublicResponseDataFormat(true); // keep compiled query in cache. ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); @@ -166,21 +169,20 @@ class TTableHelper { return false; } - auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0); - + NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0)); TxId = record.GetResponse().GetTxMeta().id(); Y_ABORT_UNLESS(!TxId.empty()); - if (t.ListSize() != 0) { - auto& list = t.GetList(0); - auto& tt = list.GetStruct(0); - if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition - auto accessTime = list.GetStruct(2).GetOptional().GetUint64(); + while(parser.TryNextRow()) { + auto tt = parser.ColumnParser(0).GetOptionalUint32(); + + if (tt.Defined()) { //already got partition + auto accessTime = *parser.ColumnParser(2).GetOptionalUint64(); if (accessTime > AccessTime) { // AccessTime - PartitionId_ = tt.GetOptional().GetUint32(); - CreateTime = list.GetStruct(1).GetOptional().GetUint64(); + PartitionId_ = *tt; + CreateTime = *parser.ColumnParser(1).GetOptionalUint64(); AccessTime = accessTime; - SeqNo_ = list.GetStruct(3).GetOptional().GetUint64(); + SeqNo_ = *parser.ColumnParser(3).GetOptionalUint64(); } } } @@ -254,7 +256,7 @@ class TTableHelper { const TString TopicHashName; NPQ::NSourceIdEncoding::TEncodedSourceId EncodedSourceId; - + NPQ::ESourceIdTableGeneration TableGeneration; TString SelectQuery; TString UpdateQuery; diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.cpp index 875c768a7195..07e2f5762195 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace NKikimr { namespace NGRpcProxy { @@ -45,15 +46,16 @@ void TClustersUpdater::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const TA auto& record = ev->Get()->Record.GetRef(); if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { - auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0); bool local = false; TVector clusters; - for (size_t i = 0; i < t.ListSize(); ++i) { - TString dc = t.GetList(i).GetStruct(0).GetOptional().GetText(); - local = t.GetList(i).GetStruct(1).GetOptional().GetBool(); + NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0)); + + while(parser.TryNextRow()) { + TString dc = *parser.ColumnParser(0).GetOptionalUtf8(); + local = *parser.ColumnParser(1).GetOptionalBool(); clusters.push_back(dc); if (local) { - bool enabled = t.GetList(i).GetStruct(2).GetOptional().GetBool(); + bool enabled = *parser.ColumnParser(2).GetOptionalBool(); Y_ABORT_UNLESS(LocalCluster.empty() || LocalCluster == dc); bool changed = LocalCluster != dc || Enabled != enabled; if (changed) {