Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix sequence requests lost and add debugging tools to sequence proxy #570

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/runtime/kqp_sequencer_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class TKqpSequencerActor : public NActors::TActorBootstrapped<TKqpSequencerActor
finished = (status == NUdf::EFetchStatus::Finish)
&& (UnprocessedRows == 0);

if (WaitingReplies == 0) {
if (PendingRows.size() > 0 && WaitingReplies == 0) {
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}

Expand Down
6 changes: 5 additions & 1 deletion ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <ydb/core/kqp/common/kqp_user_request_context.h>
#include <ydb/core/kqp/session_actor/kqp_tx.h>

#include <ydb/library/actors/core/monotonic_provider.h>

#include <util/generic/noncopyable.h>
#include <util/generic/string.h>

Expand All @@ -33,7 +35,7 @@ class TKqpQueryState : public TNonCopyable {
TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database,
const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
NWilson::TTraceId&& traceId, const TString& sessionId)
NWilson::TTraceId&& traceId, const TString& sessionId, TMonotonic startedAt)
: QueryId(queryId)
, Database(database)
, Cluster(cluster)
Expand All @@ -46,6 +48,7 @@ class TKqpQueryState : public TNonCopyable {
, StartTime(TInstant::Now())
, KeepSession(ev->Get()->GetKeepSession() || longSession)
, UserToken(ev->Get()->GetUserToken())
, StartedAt(startedAt)
{
RequestEv.reset(ev->Release().Release());

Expand Down Expand Up @@ -98,6 +101,7 @@ class TKqpQueryState : public TNonCopyable {
NKqpProto::TKqpStatsQuery Stats;
bool KeepSession = false;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
NActors::TMonotonic StartedAt;

THashMap<NKikimr::TTableId, ui64> TableVersions;

Expand Down
11 changes: 7 additions & 4 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
ev->Get()->SetClientLostAction(selfId, as);
QueryState = std::make_shared<TKqpQueryState>(
ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession,
Settings.TableService, Settings.QueryService, std::move(id), SessionId);
Settings.TableService, Settings.QueryService, std::move(id), SessionId,
AppData()->MonotonicTimeProvider->Now());
if (QueryState->UserRequestContext->TraceId.empty()) {
QueryState->UserRequestContext->TraceId = UlidGen.Next().ToString();
}
Expand Down Expand Up @@ -1309,10 +1310,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName();
LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << " send to: " << ExecuterId);

TString reason = TStringBuilder() << "Request timeout exceeded, cancelling after "
<< (AppData()->MonotonicTimeProvider->Now() - QueryState->StartedAt).MilliSeconds()
<< " milliseconds.";

if (ExecuterId) {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(
msg.GetStatusCode(),
"Request timeout exceeded");
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), reason);
Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery);
} else {
const auto& issues = ev->Get()->GetIssues();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ namespace NSequenceProxy {
auto& info = AllocateInFlight[cookie];
info.Database = database;
info.PathId = pathId;
Counters->SequenceShardAllocateCount->Collect(cache);
Register(new TAllocateActor(SelfId(), cookie, tabletId, pathId, cache));
return cookie;
}
Expand Down
53 changes: 42 additions & 11 deletions ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "sequenceproxy_impl.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/library/ydb_issue/issue_helpers.h>
#include <ydb/library/yql/public/issue/yql_issue_manager.h>

Expand All @@ -15,7 +16,21 @@
namespace NKikimr {
namespace NSequenceProxy {

TSequenceProxyCounters::TSequenceProxyCounters() {
auto group = GetServiceCounters(AppData()->Counters, "proxy");
SequenceShardAllocateCount = group->GetHistogram(
"SequenceProxy/SequenceShard/AllocateCountPerRequest",
NMonitoring::ExponentialHistogram(20, 2, 1));

ErrorsCount = group->GetCounter("SequenceProxy/Errors", true);
RequestCount = group->GetCounter("SequenceProxy/Requests", true);
ResponseCount = group->GetCounter("SequenceProxy/Responses", true);
NextValLatency = group->GetHistogram("SequenceProxy/Latency",
NMonitoring::ExponentialHistogram(20, 2, 1));
};

void TSequenceProxy::Bootstrap() {
Counters.Reset(new TSequenceProxyCounters());
LogPrefix = TStringBuilder() << "TSequenceProxy [Node " << SelfId().NodeId() << "] ";
Become(&TThis::StateWork);
}
Expand All @@ -30,13 +45,29 @@ namespace NSequenceProxy {
request.Sender = ev->Sender;
request.Cookie = ev->Cookie;
request.UserToken = std::move(msg->UserToken);
request.StartAt = AppData()->MonotonicTimeProvider->Now();
std::visit(
[&](const auto& path) {
DoNextVal(std::move(request), msg->Database, path);
},
msg->Path);
}

void TSequenceProxy::Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
Counters->ResponseCount->Inc();
auto milliseconds = (AppData()->MonotonicTimeProvider->Now() - request.StartAt).MilliSeconds();
Counters->NextValLatency->Collect(milliseconds);
Counters->ErrorsCount->Inc();
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie);
}

void TSequenceProxy::Reply(const TNextValRequestInfo& request, const TPathId& pathId, i64 value) {
Counters->ResponseCount->Inc();
auto milliseconds = (AppData()->MonotonicTimeProvider->Now() - request.StartAt).MilliSeconds();
Counters->NextValLatency->Collect(milliseconds);
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(pathId, value), 0, request.Cookie);
}

void TSequenceProxy::MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info) {
if (!info.ResolveInProgress && !info.NewNextValResolve.empty()) {
info.PendingNextValResolve = std::move(info.NewNextValResolve);
Expand All @@ -46,12 +77,15 @@ namespace NSequenceProxy {
}

void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path) {
Counters->RequestCount->Inc();
auto& info = Databases[database].SequenceByName[path];
info.NewNextValResolve.emplace_back(std::move(request));
MaybeStartResolve(database, path, info);
}

void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh) {
Counters->RequestCount->Inc();

auto& info = Databases[database].SequenceByPathId[pathId];
if (!info.ResolveInProgress && (needRefresh || !info.SequenceInfo)) {
StartResolve(database, pathId, !info.SequenceInfo);
Expand All @@ -77,14 +111,13 @@ namespace NSequenceProxy {
OnChanged(database, pathId, info);
}

void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
auto& info = Databases[database].SequenceByName[path];
Y_ABORT_UNLESS(info.ResolveInProgress);
info.ResolveInProgress = false;

while (!info.PendingNextValResolve.empty()) {
const auto& request = info.PendingNextValResolve.front();
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie);
Reply(info.PendingNextValResolve.front(), status, issues);
info.PendingNextValResolve.pop_front();
}

Expand All @@ -111,14 +144,13 @@ namespace NSequenceProxy {
MaybeStartResolve(database, path, info);
}

void TSequenceProxy::OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
void TSequenceProxy::OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
auto& info = Databases[database].SequenceByPathId[pathId];
Y_ABORT_UNLESS(info.ResolveInProgress);
info.ResolveInProgress = false;

while (!info.PendingNextValResolve.empty()) {
const auto& request = info.PendingNextValResolve.front();
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie);
Reply(info.PendingNextValResolve.front(), status, issues);
info.PendingNextValResolve.pop_front();
}
}
Expand All @@ -144,7 +176,7 @@ namespace NSequenceProxy {
info.PendingNextVal.emplace_back(std::move(request));
++info.TotalRequested;
}
resolved.pop_back();
resolved.pop_front();
}

OnChanged(database, pathId, info);
Expand Down Expand Up @@ -173,8 +205,7 @@ namespace NSequenceProxy {
} else {
// We will answer up to cache requests with this error
while (cache > 0 && !info.PendingNextVal.empty()) {
const auto& request = info.PendingNextVal.front();
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(msg->Status, msg->Issues), 0, request.Cookie);
Reply(info.PendingNextVal.front(), msg->Status, msg->Issues);
info.PendingNextVal.pop_front();
--info.TotalRequested;
--cache;
Expand Down Expand Up @@ -209,7 +240,7 @@ namespace NSequenceProxy {
<< "Access denied for " << request.UserToken->GetUserSID() << " to sequence " << pathId;
NYql::TIssueManager issueManager;
issueManager.RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::ACCESS_DENIED, error));
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(Ydb::StatusIds::UNAUTHORIZED, issueManager.GetIssues()));
Reply(request, Ydb::StatusIds::UNAUTHORIZED, issueManager.GetIssues());
return true;
}
}
Expand All @@ -226,7 +257,7 @@ namespace NSequenceProxy {
Y_ABORT_UNLESS(!info.CachedAllocations.empty());
auto& front = info.CachedAllocations.front();
Y_ABORT_UNLESS(front.Count > 0);
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(pathId, front.Start), 0, request.Cookie);
Reply(request, pathId, front.Start);
--info.TotalCached;
if (--front.Count > 0) {
front.Start += front.Increment;
Expand Down
22 changes: 20 additions & 2 deletions ydb/core/tx/sequenceproxy/sequenceproxy_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,26 @@
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/sequenceproxy/public/events.h>

#include <ydb/core/base/counters.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/monotonic_provider.h>

namespace NKikimr {
namespace NSequenceProxy {

struct TSequenceProxyCounters : TAtomicRefCount<TSequenceProxyCounters> {
::NMonitoring::TDynamicCounters::TCounterPtr RequestCount;
::NMonitoring::TDynamicCounters::TCounterPtr ResponseCount;
::NMonitoring::TDynamicCounters::TCounterPtr ErrorsCount;

::NMonitoring::THistogramPtr SequenceShardAllocateCount;
::NMonitoring::THistogramPtr NextValLatency;

TSequenceProxyCounters();
};

class TSequenceProxy : public TActorBootstrapped<TSequenceProxy> {
public:
TSequenceProxy(const TSequenceProxySettings& settings)
Expand Down Expand Up @@ -58,6 +72,7 @@ namespace NSequenceProxy {
TActorId Sender;
ui64 Cookie;
TIntrusivePtr<NACLib::TUserToken> UserToken;
TMonotonic StartAt;
};

struct TCachedAllocation {
Expand Down Expand Up @@ -127,13 +142,15 @@ namespace NSequenceProxy {
void Handle(TEvPrivate::TEvAllocateResult::TPtr& ev);
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);

void Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
void Reply(const TNextValRequestInfo& request, const TPathId& pathId, i64 value);
ui64 StartResolve(const TString& database, const std::variant<TString, TPathId>& path, bool syncVersion);
ui64 StartAllocate(ui64 tabletId, const TString& database, const TPathId& pathId, ui64 cache);
void MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info);
void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path);
void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh = true);
void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues);
void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues);
void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
void OnResolveResult(const TString& database, const TString& path, TResolveResult&& result);
void OnResolveResult(const TString& database, const TPathId& pathId, TResolveResult&& result);
void OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList<TNextValRequestInfo>& resolved);
Expand All @@ -148,6 +165,7 @@ namespace NSequenceProxy {
THashMap<ui64, TResolveInFlight> ResolveInFlight;
THashMap<ui64, TAllocateInFlight> AllocateInFlight;
ui64 LastCookie = 0;
TIntrusivePtr<TSequenceProxyCounters> Counters;
};

} // namespace NSequenceProxy
Expand Down