diff --git a/nucliadb/src/nucliadb/common/cluster/rebalance.py b/nucliadb/src/nucliadb/common/cluster/rebalance.py index 76f89ffaed..d33ab731ab 100644 --- a/nucliadb/src/nucliadb/common/cluster/rebalance.py +++ b/nucliadb/src/nucliadb/common/cluster/rebalance.py @@ -189,11 +189,13 @@ async def rebalance_kb(context: ApplicationContext, kbid: str) -> None: async def run(context: ApplicationContext) -> None: try: async with locking.distributed_lock(REBALANCE_LOCK): + # get all kb ids + async with datamanagers.with_ro_transaction() as txn: + kbids = [kbid async for kbid, _ in datamanagers.kb.get_kbs(txn)] # go through each kb and see if shards need to be reduced in size - async with datamanagers.with_transaction() as txn: - async for kbid, _ in datamanagers.kb.get_kbs(txn): - async with locking.distributed_lock(locking.KB_SHARDS_LOCK.format(kbid=kbid)): - await rebalance_kb(context, kbid) + for kbid in kbids: + async with locking.distributed_lock(locking.KB_SHARDS_LOCK.format(kbid=kbid)): + await rebalance_kb(context, kbid) except locking.ResourceLocked as exc: if exc.key == REBALANCE_LOCK: logger.warning("Another rebalance process is already running.") diff --git a/nucliadb/src/nucliadb/ingest/orm/processor/__init__.py b/nucliadb/src/nucliadb/ingest/orm/processor/__init__.py index fb5b3d8140..b2f91d5c63 100644 --- a/nucliadb/src/nucliadb/ingest/orm/processor/__init__.py +++ b/nucliadb/src/nucliadb/ingest/orm/processor/__init__.py @@ -236,15 +236,16 @@ async def txn( if len(messages) == 0: return None - txn = await self.driver.begin() kbid = messages[0].kbid - if not await datamanagers.kb.exists_kb(txn, kbid=kbid): + if not await datamanagers.atomic.kb.exists_kb(kbid=kbid): logger.info(f"KB {kbid} is deleted: skiping txn") if transaction_check: - await sequence_manager.set_last_seqid(txn, partition, seqid) - await txn.commit() + async with datamanagers.with_rw_transaction() as txn: + await sequence_manager.set_last_seqid(txn, partition, seqid) + await txn.commit() return None + txn = await self.driver.begin() try: multi = messages[0].multiid kb = KnowledgeBox(txn, self.storage, kbid) diff --git a/nucliadb/src/nucliadb/ingest/service/writer.py b/nucliadb/src/nucliadb/ingest/service/writer.py index 268bc847d2..651e07da73 100644 --- a/nucliadb/src/nucliadb/ingest/service/writer.py +++ b/nucliadb/src/nucliadb/ingest/service/writer.py @@ -209,12 +209,14 @@ async def NewEntitiesGroup( # type: ignore self, request: NewEntitiesGroupRequest, context=None ) -> NewEntitiesGroupResponse: response = NewEntitiesGroupResponse() - async with self.driver.transaction() as txn: - kbobj = await self.proc.get_kb_obj(txn, request.kb) + async with self.driver.transaction(read_only=True) as ro_txn: + kbobj = await self.proc.get_kb_obj(ro_txn, request.kb) if kbobj is None: response.status = NewEntitiesGroupResponse.Status.KB_NOT_FOUND return response + async with self.driver.transaction() as txn: + kbobj.txn = txn entities_manager = EntitiesManager(kbobj, txn) try: await entities_manager.create_entities_group(request.group, request.entities) @@ -232,7 +234,6 @@ async def GetEntities( # type: ignore response = GetEntitiesResponse() async with self.driver.transaction(read_only=True) as txn: kbobj = await self.proc.get_kb_obj(txn, request.kb) - if kbobj is None: response.status = GetEntitiesResponse.Status.NOTFOUND return response @@ -255,7 +256,6 @@ async def ListEntitiesGroups( # type: ignore response = ListEntitiesGroupsResponse() async with self.driver.transaction(read_only=True) as txn: kbobj = await self.proc.get_kb_obj(txn, request.kb) - if kbobj is None: response.status = ListEntitiesGroupsResponse.Status.NOTFOUND return response @@ -303,12 +303,14 @@ async def GetEntitiesGroup( # type: ignore async def SetEntities(self, request: SetEntitiesRequest, context=None) -> OpStatusWriter: # type: ignore response = OpStatusWriter() - async with self.driver.transaction() as txn: - kbobj = await self.proc.get_kb_obj(txn, request.kb) + async with self.driver.transaction(read_only=True) as ro_txn: + kbobj = await self.proc.get_kb_obj(ro_txn, request.kb) if kbobj is None: response.status = OpStatusWriter.Status.NOTFOUND return response + async with self.driver.transaction() as txn: + kbobj.txn = txn entities_manager = EntitiesManager(kbobj, txn) try: await entities_manager.set_entities_group(request.group, request.entities) @@ -325,14 +327,15 @@ async def UpdateEntitiesGroup( # type: ignore self, request: UpdateEntitiesGroupRequest, context=None ) -> UpdateEntitiesGroupResponse: response = UpdateEntitiesGroupResponse() - async with self.driver.transaction() as txn: - kbobj = await self.proc.get_kb_obj(txn, request.kb) + async with self.driver.transaction(read_only=True) as ro_txn: + kbobj = await self.proc.get_kb_obj(ro_txn, request.kb) if kbobj is None: response.status = UpdateEntitiesGroupResponse.Status.KB_NOT_FOUND return response + async with self.driver.transaction() as txn: + kbobj.txn = txn entities_manager = EntitiesManager(kbobj, txn) - try: await entities_manager.set_entities_group_metadata( request.group, @@ -352,12 +355,15 @@ async def UpdateEntitiesGroup( # type: ignore async def DelEntities(self, request: DelEntitiesRequest, context=None) -> OpStatusWriter: # type: ignore response = OpStatusWriter() - async with self.driver.transaction() as txn: - kbobj = await self.proc.get_kb_obj(txn, request.kb) + + async with self.driver.transaction(read_only=True) as ro_txn: + kbobj = await self.proc.get_kb_obj(ro_txn, request.kb) if kbobj is None: response.status = OpStatusWriter.Status.NOTFOUND return response + async with self.driver.transaction() as txn: + kbobj.txn = txn entities_manager = EntitiesManager(kbobj, txn) try: await entities_manager.delete_entities_group(request.group) diff --git a/nucliadb/src/nucliadb/migrator/datamanager.py b/nucliadb/src/nucliadb/migrator/datamanager.py index f6d140fc6e..b1f62fbdf0 100644 --- a/nucliadb/src/nucliadb/migrator/datamanager.py +++ b/nucliadb/src/nucliadb/migrator/datamanager.py @@ -47,8 +47,12 @@ def __init__(self, driver: Driver): self.driver = driver async def schedule_all_kbs(self, target_version: int) -> None: + # Get all kb ids + async with self.driver.transaction(read_only=True) as txn: + kbids = [kbid async for kbid, _ in datamanagers.kb.get_kbs(txn)] + # Schedule the migrations async with self.driver.transaction() as txn: - async for kbid, _ in datamanagers.kb.get_kbs(txn): + for kbid in kbids: await txn.set(MIGRATIONS_KEY.format(kbid=kbid), str(target_version).encode()) await txn.commit() @@ -66,7 +70,7 @@ async def delete_kb_migration(self, *, kbid: str) -> None: await txn.commit() async def get_kb_info(self, kbid: str) -> Optional[KnowledgeBoxInfo]: - async with self.driver.transaction() as txn: + async with self.driver.transaction(read_only=True) as txn: kb_config = await datamanagers.kb.get_config(txn, kbid=kbid) if kb_config is None: return None diff --git a/nucliadb/src/nucliadb/reader/api/v1/knowledgebox.py b/nucliadb/src/nucliadb/reader/api/v1/knowledgebox.py index ab3870a938..2698b4299c 100644 --- a/nucliadb/src/nucliadb/reader/api/v1/knowledgebox.py +++ b/nucliadb/src/nucliadb/reader/api/v1/knowledgebox.py @@ -46,7 +46,7 @@ @version(1) async def get_kbs(request: Request, prefix: str = "") -> KnowledgeBoxList: driver = get_driver() - async with driver.transaction() as txn: + async with driver.transaction(read_only=True) as txn: response = KnowledgeBoxList() async for kbid, slug in datamanagers.kb.get_kbs(txn, prefix=prefix): response.kbs.append(KnowledgeBoxObjSummary(slug=slug or None, uuid=kbid)) @@ -64,7 +64,7 @@ async def get_kbs(request: Request, prefix: str = "") -> KnowledgeBoxList: @version(1) async def get_kb(request: Request, kbid: str) -> KnowledgeBoxObj: driver = get_driver() - async with driver.transaction() as txn: + async with driver.transaction(read_only=True) as txn: kb_config = await datamanagers.kb.get_config(txn, kbid=kbid) if kb_config is None: raise HTTPException(status_code=404, detail="Knowledge Box does not exist") @@ -87,7 +87,7 @@ async def get_kb(request: Request, kbid: str) -> KnowledgeBoxObj: @version(1) async def get_kb_by_slug(request: Request, slug: str) -> KnowledgeBoxObj: driver = get_driver() - async with driver.transaction() as txn: + async with driver.transaction(read_only=True) as txn: kbid = await datamanagers.kb.get_kb_uuid(txn, slug=slug) if kbid is None: raise HTTPException(status_code=404, detail="Knowledge Box does not exist") diff --git a/nucliadb/src/nucliadb/search/search/summarize.py b/nucliadb/src/nucliadb/search/search/summarize.py index 2cd4a431b8..91ad047b5f 100644 --- a/nucliadb/src/nucliadb/search/search/summarize.py +++ b/nucliadb/src/nucliadb/search/search/summarize.py @@ -75,7 +75,7 @@ async def get_extracted_texts(kbid: str, resource_uuids_or_slugs: list[str]) -> tasks = [] # Schedule getting extracted text for each field of each resource - async with driver.transaction() as txn: + async with driver.transaction(read_only=True) as txn: if not await datamanagers.kb.exists_kb(txn, kbid=kbid): raise datamanagers.exceptions.KnowledgeBoxNotFound(kbid) diff --git a/nucliadb/src/nucliadb/train/uploader.py b/nucliadb/src/nucliadb/train/uploader.py index dde37d487e..1427246596 100644 --- a/nucliadb/src/nucliadb/train/uploader.py +++ b/nucliadb/src/nucliadb/train/uploader.py @@ -75,9 +75,8 @@ async def GetEntities( # type: ignore ) -> GetEntitiesResponse: kbid = request.kb.uuid response = GetEntitiesResponse() - async with self.proc.driver.transaction() as txn: + async with self.proc.driver.transaction(read_only=True) as txn: kbobj = await self.proc.get_kb_obj(txn, request.kb) - if kbobj is None: response.status = GetEntitiesResponse.Status.NOTFOUND return response