Skip to content

Commit

Permalink
TDbResolver: store & resend original event (ydb-platform#14883)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Feb 21, 2025
1 parent 147c972 commit f330cca
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 30 deletions.
31 changes: 15 additions & 16 deletions ydb/core/tx/scheme_board/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ namespace {
SetError(context, entry, TResolve::EStatus::LookupError, TKeyDesc::EStatus::NotExists);
}

template <typename TRequest, typename TEvRequest, typename TDerived>
template <typename TEvRequest, typename TDerived>
class TDbResolver: public TActorBootstrapped<TDerived> {
void Handle() {
TlsActivationContext->Send(new IEventHandle(Cache, Sender, new TEvRequest(Request.Release())));
Request->Rewrite(Request->GetTypeRewrite(), Cache);
this->Send(Request.Release());
this->PassAway();
}

Expand All @@ -112,17 +113,16 @@ namespace {
return NKikimrServices::TActivity::SCHEME_BOARD_DB_RESOLVER;
}

TDbResolver(const TActorId& cache, const TActorId& sender, THolder<TRequest> request, ui64 domainOwnerId)
TDbResolver(const TActorId& cache, typename TEvRequest::TPtr& request, ui64 domainOwnerId)
: Cache(cache)
, Sender(sender)
, Request(std::move(request))
, Request(request)
, DomainOwnerId(domainOwnerId)
{
}

void Bootstrap() {
TNavigate::TEntry entry;
entry.Path = SplitPath(Request->DatabaseName);
entry.Path = SplitPath(Request->Get()->Request->DatabaseName);
entry.Operation = TNavigate::EOp::OpPath;
entry.RedirectRequired = false;

Expand All @@ -140,32 +140,31 @@ namespace {
}
}

using TBase = TDbResolver<TRequest, TEvRequest, TDerived>;
using TBase = TDbResolver<TEvRequest, TDerived>;

private:
const TActorId Cache;
const TActorId Sender;
THolder<TRequest> Request;
typename TEvRequest::TPtr Request;
const ui64 DomainOwnerId;

}; // TDbResolver

class TDbResolverNavigate: public TDbResolver<TNavigate, TEvNavigate, TDbResolverNavigate> {
class TDbResolverNavigate: public TDbResolver<TEvNavigate, TDbResolverNavigate> {
public:
using TBase::TBase;
};

class TDbResolverResolve: public TDbResolver<TResolve, TEvResolve, TDbResolverResolve> {
class TDbResolverResolve: public TDbResolver<TEvResolve, TDbResolverResolve> {
public:
using TBase::TBase;
};

IActor* CreateDbResolver(const TActorId& cache, const TActorId& sender, THolder<TNavigate> request, ui64 domainOwnerId) {
return new TDbResolverNavigate(cache, sender, std::move(request), domainOwnerId);
IActor* CreateDbResolver(const TActorId& cache, TEvNavigate::TPtr& request, ui64 domainOwnerId) {
return new TDbResolverNavigate(cache, request, domainOwnerId);
}

IActor* CreateDbResolver(const TActorId& cache, const TActorId& sender, THolder<TResolve> request, ui64 domainOwnerId) {
return new TDbResolverResolve(cache, sender, std::move(request), domainOwnerId);
IActor* CreateDbResolver(const TActorId& cache, TEvResolve::TPtr& request, ui64 domainOwnerId) {
return new TDbResolverResolve(cache, request, domainOwnerId);
}

template <typename TContextPtr, typename TEvResult, typename TDerived>
Expand Down Expand Up @@ -2618,7 +2617,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
return false;
}

Register(CreateDbResolver(SelfId(), ev->Sender, THolder(request.Release()), it->second));
Register(CreateDbResolver(SelfId(), ev, it->second));
return true;
}

Expand Down
46 changes: 32 additions & 14 deletions ydb/core/tx/scheme_board/cache_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,13 @@ class TCacheTest: public TTestWithSchemeshard {
SchemeCache = Context->Register(CreateSchemeBoardSchemeCache(config.Get()));
Context->EnableScheduleForActor(SchemeCache, true);

TestAlterSubDomain(*Context, 1, "/",
"StoragePools { "
" Name: \"pool-1\" "
" Kind: \"pool-kind-1\" "
"} "
" Name: \"Root\" ");

// Context->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NLog::PRI_DEBUG);
// Context->SetLogPriority(NKikimrServices::SCHEME_BOARD_SUBSCRIBER, NLog::PRI_DEBUG);
// Context->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG);
// Context->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NLog::PRI_DEBUG);
TestAlterSubDomain(*Context, 1, "/", R"(
Name: "Root"
StoragePools {
Name: "pool-1"
Kind: "pool-kind-1"
}
)");
}

UNIT_TEST_SUITE(TCacheTest);
Expand All @@ -63,6 +59,7 @@ class TCacheTest: public TTestWithSchemeshard {
UNIT_TEST(MigrationDeletedPathNavigate);
UNIT_TEST(WatchRoot);
UNIT_TEST(PathBelongsToDomain);
UNIT_TEST(CookiesArePreserved);
UNIT_TEST_SUITE_END();

void Navigate();
Expand All @@ -83,10 +80,11 @@ class TCacheTest: public TTestWithSchemeshard {
void MigrationDeletedPathNavigate();
void WatchRoot();
void PathBelongsToDomain();
void CookiesArePreserved();

protected:
TNavigate::TEntry TestNavigateImpl(THolder<TNavigate> request, TNavigate::EStatus expectedStatus,
const TString& sid, TNavigate::EOp op, bool showPrivatePath, bool redirectRequired);
const TString& sid, TNavigate::EOp op, bool showPrivatePath, bool redirectRequired, ui64 cookie = 0);

TNavigate::TEntry TestNavigate(const TString& path, TNavigate::EStatus expectedStatus = TNavigate::EStatus::Ok,
const TString& sid = TString(), TNavigate::EOp op = TNavigate::EOp::OpPath,
Expand Down Expand Up @@ -374,7 +372,7 @@ void TCacheTest::TableSchemaVersion() {
}

TNavigate::TEntry TCacheTest::TestNavigateImpl(THolder<TNavigate> request, TNavigate::EStatus expectedStatus,
const TString& sid, TNavigate::EOp op, bool showPrivatePath, bool redirectRequired)
const TString& sid, TNavigate::EOp op, bool showPrivatePath, bool redirectRequired, ui64 cookie)
{
auto& entry = request->ResultSet.back();
entry.Operation = op;
Expand All @@ -386,10 +384,11 @@ TNavigate::TEntry TCacheTest::TestNavigateImpl(THolder<TNavigate> request, TNavi
}

const TActorId edge = Context->AllocateEdgeActor();
Context->Send(SchemeCache, edge, new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()), 0, 0, 0, true);
Context->Send(SchemeCache, edge, new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()), 0, cookie, 0, true);
auto ev = Context->GrabEdgeEvent<TEvTxProxySchemeCache::TEvNavigateKeySetResult>(edge);

UNIT_ASSERT(ev->Get());
UNIT_ASSERT_VALUES_EQUAL(ev->Cookie, cookie);
UNIT_ASSERT(!ev->Get()->Request->ResultSet.empty());

const TNavigate::TEntry result = ev->Get()->Request->ResultSet[0];
Expand Down Expand Up @@ -997,6 +996,25 @@ void TCacheTest::PathBelongsToDomain() {
}
}

void TCacheTest::CookiesArePreserved() {
ui64 txId = 100;
TestCreateSubDomain(*Context, ++txId, "/Root", R"(Name: "SubDomain")");
TestWaitNotification(*Context, {txId}, CreateNotificationSubscriber(*Context, TTestTxConfig::SchemeShard));
TestMkDir(*Context, ++txId, "/Root/SubDomain", "DirA");

ui64 cookie = 1;
// first request will run db resolver
for (int i = 0; i < 2; ++i) {
auto request = MakeHolder<TNavigate>();
request->DatabaseName = "/Root/SubDomain";
auto& entry = request->ResultSet.emplace_back();
entry.Path = SplitPath("/Root/SubDomain/DirA");
entry.RequestType = TNavigate::TEntry::ERequestType::ByPath;
auto result = TestNavigateImpl(std::move(request), TNavigate::EStatus::Ok,
"", TNavigate::EOp::OpPath, false, true, ++cookie);
}
}

class TCacheTestWithDrops: public TCacheTest {
public:
TTestContext::TEventObserver ObserverFunc() override {
Expand Down

0 comments on commit f330cca

Please sign in to comment.