Skip to content

Commit

Permalink
fix naming conflict and remove unnecessary test (ydb-platform#4381)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored May 8, 2024
1 parent b139f2a commit 0bf726c
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 60 deletions.
13 changes: 7 additions & 6 deletions ydb/core/persqueue/read_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ class TBalancer;
}


struct TPartitionInfo {
ui64 TabletId;
NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange;
};


class TMetricsTimeKeeper {
public:
TMetricsTimeKeeper(NMetrics::TResourceMetrics* metrics, const TActorContext& ctx)
Expand Down Expand Up @@ -177,6 +171,13 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
std::vector<TEvPersQueue::TEvCheckACL::TPtr> WaitingACLRequests;
std::vector<TEvPersQueue::TEvDescribe::TPtr> WaitingDescribeRequests;

public:
struct TPartitionInfo {
ui64 TabletId;
NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange;
};

private:
std::unordered_map<ui32, TPartitionInfo> PartitionsInfo;

struct TTabletInfo {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/read_balancer__balancing.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ namespace NKikimr::NPQ::NBalancing {

using namespace NTabletFlatExecutor;

using TPartitionInfo = TPersQueueReadBalancer::TPartitionInfo;

struct TSession;
struct TConsumer;
class TBalancer;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/read_balancer__txinit.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
return false;
}

std::map<ui32, TPartitionInfo> partitionsInfo;
std::map<ui32, TPersQueueReadBalancer::TPartitionInfo> partitionsInfo;
while (!partsRowset.EndOfSet()) { //found out tablets for partitions
++Self->NumActiveParts;
ui32 part = partsRowset.GetValue<Schema::Partitions::Partition>();
Expand Down
7 changes: 3 additions & 4 deletions ydb/core/persqueue/ut/common/pq_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,8 @@ void WaitPartition(const TString &session, TTestContext& tc, ui32 partition, con
UNIT_ASSERT_EQUAL(result->Record.GetSession(), sessionToRelease);
UNIT_ASSERT(ok);

THolder<TEvPersQueue::TEvPartitionReleased> request;
auto request = MakeHolder<TEvPersQueue::TEvPartitionReleased>();

request.Reset(new TEvPersQueue::TEvPartitionReleased);
auto& req = request->Record;
req.SetSession(sessionToRelease);
req.SetPartition(partition);
Expand All @@ -381,9 +380,9 @@ void WaitPartition(const TString &session, TTestContext& tc, ui32 partition, con
tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries(), pipe);
}
} catch (NActors::TSchedulingLimitReachedException) {
UNIT_ASSERT(i < 2 || !ok);
UNIT_ASSERT_C(i < 2 || !ok, "TSchedulingLimitReachedException i=" << i << " ok=" << ok);
} catch (NActors::TEmptyEventQueueException) {
UNIT_ASSERT(i < 2 || !ok);
UNIT_ASSERT_C(i < 2 || !ok, "TEmptyEventQueueException i=" << i << " ok=" << ok);
}
}
}
Expand Down
49 changes: 0 additions & 49 deletions ydb/core/persqueue/ut/pq_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,55 +459,6 @@ Y_UNIT_TEST(TestReadRuleVersions) {
});
}

Y_UNIT_TEST(TestCreateBalancer) {
TTestContext tc;
RunTestWithReboots(tc.TabletIds, [&]() {
return tc.InitialEventsFilter.Prepare();
}, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
TFinalizer finalizer(tc);
tc.Prepare(dispatchName, setup, activeZone);
activeZone = false;
tc.Runtime->SetScheduledLimit(50);
tc.Runtime->SetDispatchTimeout(TDuration::MilliSeconds(100));

TFakeSchemeShardState::TPtr state{new TFakeSchemeShardState()};
ui64 ssId = 325;
BootFakeSchemeShard(*tc.Runtime, ssId, state);

PQBalancerPrepare(TOPIC_NAME, {{1,{1,2}}}, ssId, tc);

TActorId pipe1 = RegisterReadSession("session0", tc, {1});

PQBalancerPrepare(TOPIC_NAME, {{1,{1,2}}, {2,{1,3}}}, ssId, tc);

tc.Runtime->Send(new IEventHandle(pipe1, tc.Edge, new TEvents::TEvPoisonPill()), 0, true); //will cause dying of pipe and first session


// PQBalancerPrepare(TOPIC_NAME, {{2,1}}, tc); //TODO: not supported yet
// PQBalancerPrepare(TOPIC_NAME, {{1,1}}, tc); // TODO: not supported yet
PQBalancerPrepare(TOPIC_NAME, {{1,{1, 2}}, {2,{1, 3}}, {3,{1, 4}}}, ssId, tc);
activeZone = false;

TActorId pipe = RegisterReadSession("session1", tc);
WaitPartition("session1", tc, 0, "", "", TActorId());
WaitPartition("session1", tc, 0, "", "", TActorId());
WaitPartition("session1", tc, 0, "", "", TActorId());
WaitPartition("", tc, 0, "", "", TActorId(), false);//no partitions - return error
TActorId pipe2 = RegisterReadSession("session2", tc);
Y_UNUSED(pipe2);
WaitPartition("session2", tc, 1, "session1", "topic1", pipe);
WaitPartition("", tc, 0, "", "", TActorId(), false);//no partitions to balance
tc.Runtime->Send(new IEventHandle(pipe, tc.Edge, new TEvents::TEvPoisonPill()), 0, true); //will cause dying of pipe and first session

TDispatchOptions options;
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTabletPipe::EvServerDisconnected));
tc.Runtime->DispatchEvents(options);
WaitPartition("session2", tc, 0, "", "", TActorId());
WaitPartition("session2", tc, 0, "", "", TActorId());
WaitPartition("", tc, 0, "", "", TActorId(), false);//no partitions to balance
});
}

Y_UNIT_TEST(TestDescribeBalancer) {
TTestContext tc;
RunTestWithReboots(tc.TabletIds, [&]() {
Expand Down

0 comments on commit 0bf726c

Please sign in to comment.