Skip to content

Commit

Permalink
Reduce number of select for updates (#2256)
Browse files Browse the repository at this point in the history
  • Loading branch information
lferran authored Jun 19, 2024
1 parent aea26bd commit 3fd2b7f
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 27 deletions.
10 changes: 6 additions & 4 deletions nucliadb/src/nucliadb/common/cluster/rebalance.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,13 @@ async def rebalance_kb(context: ApplicationContext, kbid: str) -> None:
async def run(context: ApplicationContext) -> None:
try:
async with locking.distributed_lock(REBALANCE_LOCK):
# get all kb ids
async with datamanagers.with_ro_transaction() as txn:
kbids = [kbid async for kbid, _ in datamanagers.kb.get_kbs(txn)]
# go through each kb and see if shards need to be reduced in size
async with datamanagers.with_transaction() as txn:
async for kbid, _ in datamanagers.kb.get_kbs(txn):
async with locking.distributed_lock(locking.KB_SHARDS_LOCK.format(kbid=kbid)):
await rebalance_kb(context, kbid)
for kbid in kbids:
async with locking.distributed_lock(locking.KB_SHARDS_LOCK.format(kbid=kbid)):
await rebalance_kb(context, kbid)
except locking.ResourceLocked as exc:
if exc.key == REBALANCE_LOCK:
logger.warning("Another rebalance process is already running.")
Expand Down
9 changes: 5 additions & 4 deletions nucliadb/src/nucliadb/ingest/orm/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,16 @@ async def txn(
if len(messages) == 0:
return None

txn = await self.driver.begin()
kbid = messages[0].kbid
if not await datamanagers.kb.exists_kb(txn, kbid=kbid):
if not await datamanagers.atomic.kb.exists_kb(kbid=kbid):
logger.info(f"KB {kbid} is deleted: skiping txn")
if transaction_check:
await sequence_manager.set_last_seqid(txn, partition, seqid)
await txn.commit()
async with datamanagers.with_rw_transaction() as txn:
await sequence_manager.set_last_seqid(txn, partition, seqid)
await txn.commit()
return None

txn = await self.driver.begin()
try:
multi = messages[0].multiid
kb = KnowledgeBox(txn, self.storage, kbid)
Expand Down
28 changes: 17 additions & 11 deletions nucliadb/src/nucliadb/ingest/service/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,14 @@ async def NewEntitiesGroup( # type: ignore
self, request: NewEntitiesGroupRequest, context=None
) -> NewEntitiesGroupResponse:
response = NewEntitiesGroupResponse()
async with self.driver.transaction() as txn:
kbobj = await self.proc.get_kb_obj(txn, request.kb)
async with self.driver.transaction(read_only=True) as ro_txn:
kbobj = await self.proc.get_kb_obj(ro_txn, request.kb)
if kbobj is None:
response.status = NewEntitiesGroupResponse.Status.KB_NOT_FOUND
return response

async with self.driver.transaction() as txn:
kbobj.txn = txn
entities_manager = EntitiesManager(kbobj, txn)
try:
await entities_manager.create_entities_group(request.group, request.entities)
Expand All @@ -232,7 +234,6 @@ async def GetEntities( # type: ignore
response = GetEntitiesResponse()
async with self.driver.transaction(read_only=True) as txn:
kbobj = await self.proc.get_kb_obj(txn, request.kb)

if kbobj is None:
response.status = GetEntitiesResponse.Status.NOTFOUND
return response
Expand All @@ -255,7 +256,6 @@ async def ListEntitiesGroups( # type: ignore
response = ListEntitiesGroupsResponse()
async with self.driver.transaction(read_only=True) as txn:
kbobj = await self.proc.get_kb_obj(txn, request.kb)

if kbobj is None:
response.status = ListEntitiesGroupsResponse.Status.NOTFOUND
return response
Expand Down Expand Up @@ -303,12 +303,14 @@ async def GetEntitiesGroup( # type: ignore

async def SetEntities(self, request: SetEntitiesRequest, context=None) -> OpStatusWriter: # type: ignore
response = OpStatusWriter()
async with self.driver.transaction() as txn:
kbobj = await self.proc.get_kb_obj(txn, request.kb)
async with self.driver.transaction(read_only=True) as ro_txn:
kbobj = await self.proc.get_kb_obj(ro_txn, request.kb)
if kbobj is None:
response.status = OpStatusWriter.Status.NOTFOUND
return response

async with self.driver.transaction() as txn:
kbobj.txn = txn
entities_manager = EntitiesManager(kbobj, txn)
try:
await entities_manager.set_entities_group(request.group, request.entities)
Expand All @@ -325,14 +327,15 @@ async def UpdateEntitiesGroup( # type: ignore
self, request: UpdateEntitiesGroupRequest, context=None
) -> UpdateEntitiesGroupResponse:
response = UpdateEntitiesGroupResponse()
async with self.driver.transaction() as txn:
kbobj = await self.proc.get_kb_obj(txn, request.kb)
async with self.driver.transaction(read_only=True) as ro_txn:
kbobj = await self.proc.get_kb_obj(ro_txn, request.kb)
if kbobj is None:
response.status = UpdateEntitiesGroupResponse.Status.KB_NOT_FOUND
return response

async with self.driver.transaction() as txn:
kbobj.txn = txn
entities_manager = EntitiesManager(kbobj, txn)

try:
await entities_manager.set_entities_group_metadata(
request.group,
Expand All @@ -352,12 +355,15 @@ async def UpdateEntitiesGroup( # type: ignore

async def DelEntities(self, request: DelEntitiesRequest, context=None) -> OpStatusWriter: # type: ignore
response = OpStatusWriter()
async with self.driver.transaction() as txn:
kbobj = await self.proc.get_kb_obj(txn, request.kb)

async with self.driver.transaction(read_only=True) as ro_txn:
kbobj = await self.proc.get_kb_obj(ro_txn, request.kb)
if kbobj is None:
response.status = OpStatusWriter.Status.NOTFOUND
return response

async with self.driver.transaction() as txn:
kbobj.txn = txn
entities_manager = EntitiesManager(kbobj, txn)
try:
await entities_manager.delete_entities_group(request.group)
Expand Down
8 changes: 6 additions & 2 deletions nucliadb/src/nucliadb/migrator/datamanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ def __init__(self, driver: Driver):
self.driver = driver

async def schedule_all_kbs(self, target_version: int) -> None:
# Get all kb ids
async with self.driver.transaction(read_only=True) as txn:
kbids = [kbid async for kbid, _ in datamanagers.kb.get_kbs(txn)]
# Schedule the migrations
async with self.driver.transaction() as txn:
async for kbid, _ in datamanagers.kb.get_kbs(txn):
for kbid in kbids:
await txn.set(MIGRATIONS_KEY.format(kbid=kbid), str(target_version).encode())
await txn.commit()

Expand All @@ -66,7 +70,7 @@ async def delete_kb_migration(self, *, kbid: str) -> None:
await txn.commit()

async def get_kb_info(self, kbid: str) -> Optional[KnowledgeBoxInfo]:
async with self.driver.transaction() as txn:
async with self.driver.transaction(read_only=True) as txn:
kb_config = await datamanagers.kb.get_config(txn, kbid=kbid)
if kb_config is None:
return None
Expand Down
6 changes: 3 additions & 3 deletions nucliadb/src/nucliadb/reader/api/v1/knowledgebox.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
@version(1)
async def get_kbs(request: Request, prefix: str = "") -> KnowledgeBoxList:
driver = get_driver()
async with driver.transaction() as txn:
async with driver.transaction(read_only=True) as txn:
response = KnowledgeBoxList()
async for kbid, slug in datamanagers.kb.get_kbs(txn, prefix=prefix):
response.kbs.append(KnowledgeBoxObjSummary(slug=slug or None, uuid=kbid))
Expand All @@ -64,7 +64,7 @@ async def get_kbs(request: Request, prefix: str = "") -> KnowledgeBoxList:
@version(1)
async def get_kb(request: Request, kbid: str) -> KnowledgeBoxObj:
driver = get_driver()
async with driver.transaction() as txn:
async with driver.transaction(read_only=True) as txn:
kb_config = await datamanagers.kb.get_config(txn, kbid=kbid)
if kb_config is None:
raise HTTPException(status_code=404, detail="Knowledge Box does not exist")
Expand All @@ -87,7 +87,7 @@ async def get_kb(request: Request, kbid: str) -> KnowledgeBoxObj:
@version(1)
async def get_kb_by_slug(request: Request, slug: str) -> KnowledgeBoxObj:
driver = get_driver()
async with driver.transaction() as txn:
async with driver.transaction(read_only=True) as txn:
kbid = await datamanagers.kb.get_kb_uuid(txn, slug=slug)
if kbid is None:
raise HTTPException(status_code=404, detail="Knowledge Box does not exist")
Expand Down
2 changes: 1 addition & 1 deletion nucliadb/src/nucliadb/search/search/summarize.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def get_extracted_texts(kbid: str, resource_uuids_or_slugs: list[str]) ->
tasks = []

# Schedule getting extracted text for each field of each resource
async with driver.transaction() as txn:
async with driver.transaction(read_only=True) as txn:
if not await datamanagers.kb.exists_kb(txn, kbid=kbid):
raise datamanagers.exceptions.KnowledgeBoxNotFound(kbid)

Expand Down
3 changes: 1 addition & 2 deletions nucliadb/src/nucliadb/train/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,8 @@ async def GetEntities( # type: ignore
) -> GetEntitiesResponse:
kbid = request.kb.uuid
response = GetEntitiesResponse()
async with self.proc.driver.transaction() as txn:
async with self.proc.driver.transaction(read_only=True) as txn:
kbobj = await self.proc.get_kb_obj(txn, request.kb)

if kbobj is None:
response.status = GetEntitiesResponse.Status.NOTFOUND
return response
Expand Down

2 comments on commit 3fd2b7f

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 3fd2b7f Previous: 0d03d9f Ratio
tests/search/unit/search/test_fetch.py::test_highligh_error 2259.289699344487 iter/sec (stddev: 0.000004567517136257628) 2841.0684406726436 iter/sec (stddev: 0.000004954958228416619) 1.26

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 3fd2b7f Previous: 0d03d9f Ratio
tests/search/unit/search/test_fetch.py::test_highligh_error 2156.102649367142 iter/sec (stddev: 0.000006981769826515487) 2841.0684406726436 iter/sec (stddev: 0.000004954958228416619) 1.32

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.