From c6c2d612a6f6d8b068c7c7b21e7880d5bdea4e10 Mon Sep 17 00:00:00 2001 From: Ferran Llamas Date: Wed, 19 Jun 2024 12:51:16 +0200 Subject: [PATCH] Reduce more the number of select for updates (#2257) --- .../src/nucliadb/common/cluster/rebalance.py | 2 +- .../src/nucliadb/common/cluster/rollover.py | 95 ++++++++++--------- .../nucliadb/ingest/orm/processor/__init__.py | 3 +- nucliadb/src/nucliadb/writer/back_pressure.py | 2 +- 4 files changed, 52 insertions(+), 50 deletions(-) diff --git a/nucliadb/src/nucliadb/common/cluster/rebalance.py b/nucliadb/src/nucliadb/common/cluster/rebalance.py index d33ab731ab..2c435df3dd 100644 --- a/nucliadb/src/nucliadb/common/cluster/rebalance.py +++ b/nucliadb/src/nucliadb/common/cluster/rebalance.py @@ -86,7 +86,7 @@ async def move_set_of_kb_resources( to_shard_id: str, count: int = 20, ) -> None: - async with datamanagers.with_transaction() as txn: + async with datamanagers.with_ro_transaction() as txn: kb_shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid) if kb_shards is None: # pragma: no cover logger.warning("No shards found for kb. This should not happen.", extra={"kbid": kbid}) diff --git a/nucliadb/src/nucliadb/common/cluster/rollover.py b/nucliadb/src/nucliadb/common/cluster/rollover.py index eee49a8cd2..6faa27a8ce 100644 --- a/nucliadb/src/nucliadb/common/cluster/rollover.py +++ b/nucliadb/src/nucliadb/common/cluster/rollover.py @@ -70,7 +70,7 @@ async def create_rollover_shards( logger.info("Creating rollover shards", extra={"kbid": kbid}) sm = app_context.shard_manager - async with datamanagers.with_transaction() as txn: + async with datamanagers.with_ro_transaction() as txn: existing_rollover_shards = await datamanagers.rollover.get_kb_rollover_shards(txn, kbid=kbid) if existing_rollover_shards is not None: logger.info("Rollover shards already exist, skipping", extra={"kbid": kbid}) @@ -80,54 +80,55 @@ async def create_rollover_shards( if kb_shards is None: raise UnexpectedRolloverError(f"No shards found for KB {kbid}") - # create new shards - created_shards = [] - try: - nodes = cluster_manager.sorted_primary_nodes(ignore_nodes=drain_nodes) - for shard in kb_shards.shards: - shard.ClearField("replicas") - # Attempt to create configured number of replicas - replicas_created = 0 - while replicas_created < settings.node_replicas: - if len(nodes) == 0: - # could have multiple shards on single node - nodes = cluster_manager.sorted_primary_nodes(ignore_nodes=drain_nodes) - node_id = nodes.pop(0) - - node = get_index_node(node_id) - if node is None: - logger.error(f"Node {node_id} is not found or not available") - continue - is_matryoshka = len(kb_shards.model.matryoshka_dimensions) > 0 - vector_index_config = nodewriter_pb2.VectorIndexConfig( - similarity=kb_shards.similarity, - vector_type=nodewriter_pb2.VectorType.DENSE_F32, - vector_dimension=kb_shards.model.vector_dimension, - normalize_vectors=is_matryoshka, + # create new shards + created_shards = [] + try: + nodes = cluster_manager.sorted_primary_nodes(ignore_nodes=drain_nodes) + for shard in kb_shards.shards: + shard.ClearField("replicas") + # Attempt to create configured number of replicas + replicas_created = 0 + while replicas_created < settings.node_replicas: + if len(nodes) == 0: + # could have multiple shards on single node + nodes = cluster_manager.sorted_primary_nodes(ignore_nodes=drain_nodes) + node_id = nodes.pop(0) + + node = get_index_node(node_id) + if node is None: + logger.error(f"Node {node_id} is not found or not available") + continue + is_matryoshka = len(kb_shards.model.matryoshka_dimensions) > 0 + vector_index_config = nodewriter_pb2.VectorIndexConfig( + similarity=kb_shards.similarity, + vector_type=nodewriter_pb2.VectorType.DENSE_F32, + vector_dimension=kb_shards.model.vector_dimension, + normalize_vectors=is_matryoshka, + ) + try: + shard_created = await node.new_shard( + kbid, + release_channel=kb_shards.release_channel, + vector_index_config=vector_index_config, ) - try: - shard_created = await node.new_shard( - kbid, - release_channel=kb_shards.release_channel, - vector_index_config=vector_index_config, - ) - except Exception as e: - errors.capture_exception(e) - logger.exception(f"Error creating new shard at {node}") - continue - - replica = writer_pb2.ShardReplica(node=str(node_id)) - replica.shard.CopyFrom(shard_created) - shard.replicas.append(replica) - created_shards.append(shard) - replicas_created += 1 - except Exception as e: - errors.capture_exception(e) - logger.exception("Unexpected error creating new shard") - for created_shard in created_shards: - await sm.rollback_shard(created_shard) - raise e + except Exception as e: + errors.capture_exception(e) + logger.exception(f"Error creating new shard at {node}") + continue + + replica = writer_pb2.ShardReplica(node=str(node_id)) + replica.shard.CopyFrom(shard_created) + shard.replicas.append(replica) + created_shards.append(shard) + replicas_created += 1 + except Exception as e: + errors.capture_exception(e) + logger.exception("Unexpected error creating new shard") + for created_shard in created_shards: + await sm.rollback_shard(created_shard) + raise e + async with datamanagers.with_transaction() as txn: await datamanagers.rollover.update_kb_rollover_shards(txn, kbid=kbid, kb_shards=kb_shards) await txn.commit() return kb_shards diff --git a/nucliadb/src/nucliadb/ingest/orm/processor/__init__.py b/nucliadb/src/nucliadb/ingest/orm/processor/__init__.py index b2f91d5c63..cd19c7fa85 100644 --- a/nucliadb/src/nucliadb/ingest/orm/processor/__init__.py +++ b/nucliadb/src/nucliadb/ingest/orm/processor/__init__.py @@ -392,7 +392,8 @@ async def index_resource( if shard is None: # It's a new resource, get current active shard to place # new resource on - shard = await self.shard_manager.get_current_active_shard(txn, kbid) + async with datamanagers.with_ro_transaction() as ro_txn: + shard = await self.shard_manager.get_current_active_shard(ro_txn, kbid) if shard is None: # no shard available, create a new one shard = await self.shard_manager.create_shard_by_kbid(txn, kbid) diff --git a/nucliadb/src/nucliadb/writer/back_pressure.py b/nucliadb/src/nucliadb/writer/back_pressure.py index 45fa8c5c76..8f8a2053e0 100644 --- a/nucliadb/src/nucliadb/writer/back_pressure.py +++ b/nucliadb/src/nucliadb/writer/back_pressure.py @@ -504,7 +504,7 @@ async def get_nats_consumer_pending_messages( async def get_kb_active_shard(context: ApplicationContext, kbid: str) -> Optional[ShardObject]: - async with context.kv_driver.transaction() as txn: + async with context.kv_driver.transaction(read_only=True) as txn: return await context.shard_manager.get_current_active_shard(txn, kbid)