Skip to content

Commit

Permalink
clean more
Browse files Browse the repository at this point in the history
  • Loading branch information
lferran committed Mar 7, 2025
1 parent 32a128b commit 6aec80a
Show file tree
Hide file tree
Showing 10 changed files with 7 additions and 123 deletions.
20 changes: 2 additions & 18 deletions nucliadb/src/nucliadb/common/cluster/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from abc import ABCMeta, abstractmethod
from typing import AsyncIterator, Optional
from typing import AsyncIterator

from nucliadb_protos import nodereader_pb2, noderesources_pb2, utils_pb2
from nucliadb_protos.nodereader_pb2_grpc import NodeReaderStub
Expand All @@ -38,31 +38,15 @@ def __init__(
self,
*,
id: str,
address: str,
shard_count: int,
available_disk: int,
dummy: bool = False,
primary_id: Optional[str] = None,
):
self.id = id
self.address = address
self.shard_count = shard_count
self.available_disk = available_disk
self.dummy = dummy
self.primary_id = primary_id

def __str__(self):
if self.primary_id is None:
return f"{self.__class__.__name__}({self.id}, {self.address})"
else:
return f"{self.__class__.__name__}({self.id}, {self.address}, primary_id={self.primary_id})"
return f"{self.__class__.__name__}({self.id}"

def __repr__(self):
return self.__str__()

def is_read_replica(self) -> bool:
return self.primary_id is not None

@property
@abstractmethod
def reader(self) -> NodeReaderStub: # pragma: no cover
Expand Down
15 changes: 2 additions & 13 deletions nucliadb/src/nucliadb/common/cluster/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import logging
from typing import TYPE_CHECKING, Optional, Union
from typing import Optional, Union

import backoff

Expand All @@ -28,22 +28,16 @@
StandaloneKBShardManager,
)
from nucliadb.common.cluster.settings import settings
from nucliadb.common.context import ApplicationContext
from nucliadb.ingest.orm.resource import Resource
from nucliadb_protos import nodereader_pb2, writer_pb2
from nucliadb_utils.utilities import Utility, clean_utility, get_utility, set_utility

if TYPE_CHECKING: # pragma: no cover
from nucliadb.common.context import ApplicationContext
else:
ApplicationContext = None

logger = logging.getLogger(__name__)


_lock = asyncio.Lock()

_STANDALONE_SERVER = "_standalone_service"


async def setup_cluster() -> Union[KBShardManager, StandaloneKBShardManager]:
async with _lock:
Expand All @@ -64,11 +58,6 @@ async def teardown_cluster():
if get_utility(Utility.SHARD_MANAGER):
clean_utility(Utility.SHARD_MANAGER)

std_server = get_utility(_STANDALONE_SERVER)
if std_server is not None:
await std_server.stop(None)
clean_utility(_STANDALONE_SERVER)


def get_shard_manager() -> KBShardManager:
return get_utility(Utility.SHARD_MANAGER) # type: ignore
Expand Down
11 changes: 0 additions & 11 deletions nucliadb/src/nucliadb/common/nidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,21 +278,10 @@ def reader(self):
def writer(self):
return self.client

def is_read_replica(_):
return False

@property
def id(self):
return "nidx"

@property
def address(self):
return "nidx"

@property
def primary_id(self):
return "nidx"


def get_nidx_fake_node() -> FakeNode:
nidx = get_nidx()
Expand Down
22 changes: 0 additions & 22 deletions nucliadb/src/nucliadb/ingest/service/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from nucliadb.backups import utils as backup_utils
from nucliadb.common import datamanagers
from nucliadb.common.cluster.exceptions import AlreadyExists, EntitiesGroupNotFound
from nucliadb.common.cluster.manager import get_nidx_fake_node
from nucliadb.common.cluster.utils import get_shard_manager
from nucliadb.common.datamanagers.exceptions import KnowledgeBoxNotFound
from nucliadb.common.external_index_providers.exceptions import ExternalIndexCreationError
Expand Down Expand Up @@ -58,8 +57,6 @@
IndexStatus,
ListEntitiesGroupsRequest,
ListEntitiesGroupsResponse,
ListMembersRequest,
ListMembersResponse,
NewEntitiesGroupRequest,
NewEntitiesGroupResponse,
OpStatusWriter,
Expand Down Expand Up @@ -406,25 +403,6 @@ async def Status( # type: ignore

return response

async def ListMembers( # type: ignore
self, request: ListMembersRequest, context=None
) -> ListMembersResponse:
response = ListMembersResponse()
response.members.extend(
[
writer_pb2.Member(
id=n.id,
listen_address=n.address,
is_self=False,
dummy=n.dummy,
shard_count=n.shard_count,
primary_id=n.primary_id or "",
)
for n in [get_nidx_fake_node()]
]
)
return response

async def Index(self, request: IndexResource, context=None) -> IndexStatus: # type: ignore
async with self.driver.transaction() as txn:
kbobj = KnowledgeBoxORM(txn, self.storage, request.kbid)
Expand Down
4 changes: 1 addition & 3 deletions nucliadb/src/nucliadb/search/requesters/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,7 @@ def debug_nodes_info(nodes: list[tuple[AbstractIndexNode, str]]) -> list[dict[st
info = {
"id": node.id,
"shard_id": shard_id,
"address": node.address,
"address": "nidx",
}
if node.primary_id:
info["primary_id"] = node.primary_id
details.append(info)
return details
2 changes: 0 additions & 2 deletions nucliadb/tests/search/unit/search/requesters/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ def shard_manager():
@pytest.fixture()
def faulty_search_methods():
def fake_search(node: AbstractIndexNode, shard: str, query: nodereader_pb2.SearchRequest):
if node.is_read_replica():
raise Exception()
return nodereader_pb2.SearchResponse()

faulty_methods = {utils.Method.SEARCH: AsyncMock(side_effect=fake_search)}
Expand Down
4 changes: 2 additions & 2 deletions nucliadb_protos/python/src/nucliadb_protos/writer_pb2.py

Large diffs are not rendered by default.

33 changes: 0 additions & 33 deletions nucliadb_protos/python/src/nucliadb_protos/writer_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ def __init__(self, channel):
request_serializer=nucliadb__protos_dot_writer__pb2.WriterStatusRequest.SerializeToString,
response_deserializer=nucliadb__protos_dot_writer__pb2.WriterStatusResponse.FromString,
)
self.ListMembers = channel.unary_unary(
'/fdbwriter.Writer/ListMembers',
request_serializer=nucliadb__protos_dot_writer__pb2.ListMembersRequest.SerializeToString,
response_deserializer=nucliadb__protos_dot_writer__pb2.ListMembersResponse.FromString,
)
self.Index = channel.unary_unary(
'/fdbwriter.Writer/Index',
request_serializer=nucliadb__protos_dot_writer__pb2.IndexResource.SerializeToString,
Expand Down Expand Up @@ -184,12 +179,6 @@ def Status(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def ListMembers(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def Index(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
Expand Down Expand Up @@ -283,11 +272,6 @@ def add_WriterServicer_to_server(servicer, server):
request_deserializer=nucliadb__protos_dot_writer__pb2.WriterStatusRequest.FromString,
response_serializer=nucliadb__protos_dot_writer__pb2.WriterStatusResponse.SerializeToString,
),
'ListMembers': grpc.unary_unary_rpc_method_handler(
servicer.ListMembers,
request_deserializer=nucliadb__protos_dot_writer__pb2.ListMembersRequest.FromString,
response_serializer=nucliadb__protos_dot_writer__pb2.ListMembersResponse.SerializeToString,
),
'Index': grpc.unary_unary_rpc_method_handler(
servicer.Index,
request_deserializer=nucliadb__protos_dot_writer__pb2.IndexResource.FromString,
Expand Down Expand Up @@ -527,23 +511,6 @@ def Status(request,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def ListMembers(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/fdbwriter.Writer/ListMembers',
nucliadb__protos_dot_writer__pb2.ListMembersRequest.SerializeToString,
nucliadb__protos_dot_writer__pb2.ListMembersResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def Index(request,
target,
Expand Down
17 changes: 0 additions & 17 deletions nucliadb_protos/python/src/nucliadb_protos/writer_pb2_grpc.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,6 @@ class WriterStub:
nucliadb_protos.writer_pb2.WriterStatusResponse,
]

ListMembers: grpc.UnaryUnaryMultiCallable[
nucliadb_protos.writer_pb2.ListMembersRequest,
nucliadb_protos.writer_pb2.ListMembersResponse,
]

Index: grpc.UnaryUnaryMultiCallable[
nucliadb_protos.writer_pb2.IndexResource,
nucliadb_protos.writer_pb2.IndexStatus,
Expand Down Expand Up @@ -343,11 +338,6 @@ class WriterAsyncStub:
nucliadb_protos.writer_pb2.WriterStatusResponse,
]

ListMembers: grpc.aio.UnaryUnaryMultiCallable[
nucliadb_protos.writer_pb2.ListMembersRequest,
nucliadb_protos.writer_pb2.ListMembersResponse,
]

Index: grpc.aio.UnaryUnaryMultiCallable[
nucliadb_protos.writer_pb2.IndexResource,
nucliadb_protos.writer_pb2.IndexStatus,
Expand Down Expand Up @@ -459,13 +449,6 @@ class WriterServicer(metaclass=abc.ABCMeta):
context: _ServicerContext,
) -> typing.Union[nucliadb_protos.writer_pb2.WriterStatusResponse, collections.abc.Awaitable[nucliadb_protos.writer_pb2.WriterStatusResponse]]: ...

@abc.abstractmethod
def ListMembers(
self,
request: nucliadb_protos.writer_pb2.ListMembersRequest,
context: _ServicerContext,
) -> typing.Union[nucliadb_protos.writer_pb2.ListMembersResponse, collections.abc.Awaitable[nucliadb_protos.writer_pb2.ListMembersResponse]]: ...

@abc.abstractmethod
def Index(
self,
Expand Down
2 changes: 0 additions & 2 deletions nucliadb_protos/writer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,6 @@ service Writer {

rpc Status(WriterStatusRequest) returns (WriterStatusResponse) {}

rpc ListMembers(ListMembersRequest) returns (ListMembersResponse);

rpc Index(IndexResource) returns (IndexStatus) {}
rpc ReIndex(IndexResource) returns (IndexStatus) {}

Expand Down

0 comments on commit 6aec80a

Please sign in to comment.