Skip to content

Commit

Permalink
fix sequence requests lost and add debugging tools to sequence proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit committed Dec 19, 2023
1 parent 1f88488 commit cf7b042
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 19 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/runtime/kqp_sequencer_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class TKqpSequencerActor : public NActors::TActorBootstrapped<TKqpSequencerActor
finished = (status == NUdf::EFetchStatus::Finish)
&& (UnprocessedRows == 0);

if (WaitingReplies == 0) {
if (PendingRows.size() > 0 && WaitingReplies == 0) {
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}

Expand Down
6 changes: 5 additions & 1 deletion ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <ydb/core/kqp/common/kqp_user_request_context.h>
#include <ydb/core/kqp/session_actor/kqp_tx.h>

#include <ydb/library/actors/core/monotonic_provider.h>

#include <util/generic/noncopyable.h>
#include <util/generic/string.h>

Expand All @@ -33,7 +35,7 @@ class TKqpQueryState : public TNonCopyable {
TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database,
const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
NWilson::TTraceId&& traceId, const TString& sessionId)
NWilson::TTraceId&& traceId, const TString& sessionId, TMonotonic startedAt)
: QueryId(queryId)
, Database(database)
, Cluster(cluster)
Expand All @@ -46,6 +48,7 @@ class TKqpQueryState : public TNonCopyable {
, StartTime(TInstant::Now())
, KeepSession(ev->Get()->GetKeepSession() || longSession)
, UserToken(ev->Get()->GetUserToken())
, StartedAt(startedAt)
{
RequestEv.reset(ev->Release().Release());

Expand Down Expand Up @@ -98,6 +101,7 @@ class TKqpQueryState : public TNonCopyable {
NKqpProto::TKqpStatsQuery Stats;
bool KeepSession = false;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
NActors::TMonotonic StartedAt;

THashMap<NKikimr::TTableId, ui64> TableVersions;

Expand Down
11 changes: 7 additions & 4 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
ev->Get()->SetClientLostAction(selfId, as);
QueryState = std::make_shared<TKqpQueryState>(
ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession,
Settings.TableService, Settings.QueryService, std::move(id), SessionId);
Settings.TableService, Settings.QueryService, std::move(id), SessionId,
AppData()->MonotonicTimeProvider->Now());
if (QueryState->UserRequestContext->TraceId.empty()) {
QueryState->UserRequestContext->TraceId = UlidGen.Next().ToString();
}
Expand Down Expand Up @@ -1309,10 +1310,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName();
LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << " send to: " << ExecuterId);

TString reason = TStringBuilder() << "Request timeout exceeded, cancelling after "
<< (AppData()->MonotonicTimeProvider->Now() - QueryState->StartedAt).MilliSeconds()
<< " milliseconds.";

if (ExecuterId) {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(
msg.GetStatusCode(),
"Request timeout exceeded");
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), reason);
Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery);
} else {
const auto& issues = ev->Get()->GetIssues();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ namespace NSequenceProxy {
auto& info = AllocateInFlight[cookie];
info.Database = database;
info.PathId = pathId;
Counters->SequenceShardAllocateCount->Collect(cache);
Register(new TAllocateActor(SelfId(), cookie, tabletId, pathId, cache));
return cookie;
}
Expand Down
53 changes: 42 additions & 11 deletions ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "sequenceproxy_impl.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/library/ydb_issue/issue_helpers.h>
#include <ydb/library/yql/public/issue/yql_issue_manager.h>

Expand All @@ -15,7 +16,21 @@
namespace NKikimr {
namespace NSequenceProxy {

TSequenceProxyCounters::TSequenceProxyCounters() {
auto group = GetServiceCounters(AppData()->Counters, "proxy");
SequenceShardAllocateCount = group->GetHistogram(
"SequenceProxy/SequenceShard/AllocateCountPerRequest",
NMonitoring::ExponentialHistogram(20, 2, 1));

ErrorsCount = group->GetCounter("SequenceProxy/Errors", true);
RequestCount = group->GetCounter("SequenceProxy/Requests", true);
ResponseCount = group->GetCounter("SequenceProxy/Responses", true);
NextValLatency = group->GetHistogram("SequenceProxy/Latency",
NMonitoring::ExponentialHistogram(20, 2, 1));
};

void TSequenceProxy::Bootstrap() {
Counters.Reset(new TSequenceProxyCounters());
LogPrefix = TStringBuilder() << "TSequenceProxy [Node " << SelfId().NodeId() << "] ";
Become(&TThis::StateWork);
}
Expand All @@ -30,13 +45,29 @@ namespace NSequenceProxy {
request.Sender = ev->Sender;
request.Cookie = ev->Cookie;
request.UserToken = std::move(msg->UserToken);
request.StartAt = AppData()->MonotonicTimeProvider->Now();
std::visit(
[&](const auto& path) {
DoNextVal(std::move(request), msg->Database, path);
},
msg->Path);
}

void TSequenceProxy::Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
Counters->ResponseCount->Inc();
auto milliseconds = (AppData()->MonotonicTimeProvider->Now() - request.StartAt).MilliSeconds();
Counters->NextValLatency->Collect(milliseconds);
Counters->ErrorsCount->Inc();
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie);
}

void TSequenceProxy::Reply(const TNextValRequestInfo& request, const TPathId& pathId, i64 value) {
Counters->ResponseCount->Inc();
auto milliseconds = (AppData()->MonotonicTimeProvider->Now() - request.StartAt).MilliSeconds();
Counters->NextValLatency->Collect(milliseconds);
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(pathId, value), 0, request.Cookie);
}

void TSequenceProxy::MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info) {
if (!info.ResolveInProgress && !info.NewNextValResolve.empty()) {
info.PendingNextValResolve = std::move(info.NewNextValResolve);
Expand All @@ -46,12 +77,15 @@ namespace NSequenceProxy {
}

void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path) {
Counters->RequestCount->Inc();
auto& info = Databases[database].SequenceByName[path];
info.NewNextValResolve.emplace_back(std::move(request));
MaybeStartResolve(database, path, info);
}

void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh) {
Counters->RequestCount->Inc();

auto& info = Databases[database].SequenceByPathId[pathId];
if (!info.ResolveInProgress && (needRefresh || !info.SequenceInfo)) {
StartResolve(database, pathId, !info.SequenceInfo);
Expand All @@ -77,14 +111,13 @@ namespace NSequenceProxy {
OnChanged(database, pathId, info);
}

void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
auto& info = Databases[database].SequenceByName[path];
Y_ABORT_UNLESS(info.ResolveInProgress);
info.ResolveInProgress = false;

while (!info.PendingNextValResolve.empty()) {
const auto& request = info.PendingNextValResolve.front();
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie);
Reply(info.PendingNextValResolve.front(), status, issues);
info.PendingNextValResolve.pop_front();
}

Expand All @@ -111,14 +144,13 @@ namespace NSequenceProxy {
MaybeStartResolve(database, path, info);
}

void TSequenceProxy::OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
void TSequenceProxy::OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
auto& info = Databases[database].SequenceByPathId[pathId];
Y_ABORT_UNLESS(info.ResolveInProgress);
info.ResolveInProgress = false;

while (!info.PendingNextValResolve.empty()) {
const auto& request = info.PendingNextValResolve.front();
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie);
Reply(info.PendingNextValResolve.front(), status, issues);
info.PendingNextValResolve.pop_front();
}
}
Expand All @@ -144,7 +176,7 @@ namespace NSequenceProxy {
info.PendingNextVal.emplace_back(std::move(request));
++info.TotalRequested;
}
resolved.pop_back();
resolved.pop_front();
}

OnChanged(database, pathId, info);
Expand Down Expand Up @@ -173,8 +205,7 @@ namespace NSequenceProxy {
} else {
// We will answer up to cache requests with this error
while (cache > 0 && !info.PendingNextVal.empty()) {
const auto& request = info.PendingNextVal.front();
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(msg->Status, msg->Issues), 0, request.Cookie);
Reply(info.PendingNextVal.front(), msg->Status, msg->Issues);
info.PendingNextVal.pop_front();
--info.TotalRequested;
--cache;
Expand Down Expand Up @@ -209,7 +240,7 @@ namespace NSequenceProxy {
<< "Access denied for " << request.UserToken->GetUserSID() << " to sequence " << pathId;
NYql::TIssueManager issueManager;
issueManager.RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::ACCESS_DENIED, error));
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(Ydb::StatusIds::UNAUTHORIZED, issueManager.GetIssues()));
Reply(request, Ydb::StatusIds::UNAUTHORIZED, issueManager.GetIssues());
return true;
}
}
Expand All @@ -226,7 +257,7 @@ namespace NSequenceProxy {
Y_ABORT_UNLESS(!info.CachedAllocations.empty());
auto& front = info.CachedAllocations.front();
Y_ABORT_UNLESS(front.Count > 0);
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(pathId, front.Start), 0, request.Cookie);
Reply(request, pathId, front.Start);
--info.TotalCached;
if (--front.Count > 0) {
front.Start += front.Increment;
Expand Down
22 changes: 20 additions & 2 deletions ydb/core/tx/sequenceproxy/sequenceproxy_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,26 @@
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/sequenceproxy/public/events.h>

#include <ydb/core/base/counters.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/monotonic_provider.h>

namespace NKikimr {
namespace NSequenceProxy {

struct TSequenceProxyCounters : TAtomicRefCount<TSequenceProxyCounters> {
::NMonitoring::TDynamicCounters::TCounterPtr RequestCount;
::NMonitoring::TDynamicCounters::TCounterPtr ResponseCount;
::NMonitoring::TDynamicCounters::TCounterPtr ErrorsCount;

::NMonitoring::THistogramPtr SequenceShardAllocateCount;
::NMonitoring::THistogramPtr NextValLatency;

TSequenceProxyCounters();
};

class TSequenceProxy : public TActorBootstrapped<TSequenceProxy> {
public:
TSequenceProxy(const TSequenceProxySettings& settings)
Expand Down Expand Up @@ -58,6 +72,7 @@ namespace NSequenceProxy {
TActorId Sender;
ui64 Cookie;
TIntrusivePtr<NACLib::TUserToken> UserToken;
TMonotonic StartAt;
};

struct TCachedAllocation {
Expand Down Expand Up @@ -127,13 +142,15 @@ namespace NSequenceProxy {
void Handle(TEvPrivate::TEvAllocateResult::TPtr& ev);
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);

void Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
void Reply(const TNextValRequestInfo& request, const TPathId& pathId, i64 value);
ui64 StartResolve(const TString& database, const std::variant<TString, TPathId>& path, bool syncVersion);
ui64 StartAllocate(ui64 tabletId, const TString& database, const TPathId& pathId, ui64 cache);
void MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info);
void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path);
void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh = true);
void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues);
void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues);
void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
void OnResolveResult(const TString& database, const TString& path, TResolveResult&& result);
void OnResolveResult(const TString& database, const TPathId& pathId, TResolveResult&& result);
void OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList<TNextValRequestInfo>& resolved);
Expand All @@ -148,6 +165,7 @@ namespace NSequenceProxy {
THashMap<ui64, TResolveInFlight> ResolveInFlight;
THashMap<ui64, TAllocateInFlight> AllocateInFlight;
ui64 LastCookie = 0;
TIntrusivePtr<TSequenceProxyCounters> Counters;
};

} // namespace NSequenceProxy
Expand Down

0 comments on commit cf7b042

Please sign in to comment.