Skip to content

Commit

Permalink
Reduce more the number of select for updates (#2257)
Browse files Browse the repository at this point in the history
  • Loading branch information
lferran authored Jun 19, 2024
1 parent 3fd2b7f commit c6c2d61
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 50 deletions.
2 changes: 1 addition & 1 deletion nucliadb/src/nucliadb/common/cluster/rebalance.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async def move_set_of_kb_resources(
to_shard_id: str,
count: int = 20,
) -> None:
async with datamanagers.with_transaction() as txn:
async with datamanagers.with_ro_transaction() as txn:
kb_shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
if kb_shards is None: # pragma: no cover
logger.warning("No shards found for kb. This should not happen.", extra={"kbid": kbid})
Expand Down
95 changes: 48 additions & 47 deletions nucliadb/src/nucliadb/common/cluster/rollover.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def create_rollover_shards(
logger.info("Creating rollover shards", extra={"kbid": kbid})
sm = app_context.shard_manager

async with datamanagers.with_transaction() as txn:
async with datamanagers.with_ro_transaction() as txn:
existing_rollover_shards = await datamanagers.rollover.get_kb_rollover_shards(txn, kbid=kbid)
if existing_rollover_shards is not None:
logger.info("Rollover shards already exist, skipping", extra={"kbid": kbid})
Expand All @@ -80,54 +80,55 @@ async def create_rollover_shards(
if kb_shards is None:
raise UnexpectedRolloverError(f"No shards found for KB {kbid}")

# create new shards
created_shards = []
try:
nodes = cluster_manager.sorted_primary_nodes(ignore_nodes=drain_nodes)
for shard in kb_shards.shards:
shard.ClearField("replicas")
# Attempt to create configured number of replicas
replicas_created = 0
while replicas_created < settings.node_replicas:
if len(nodes) == 0:
# could have multiple shards on single node
nodes = cluster_manager.sorted_primary_nodes(ignore_nodes=drain_nodes)
node_id = nodes.pop(0)

node = get_index_node(node_id)
if node is None:
logger.error(f"Node {node_id} is not found or not available")
continue
is_matryoshka = len(kb_shards.model.matryoshka_dimensions) > 0
vector_index_config = nodewriter_pb2.VectorIndexConfig(
similarity=kb_shards.similarity,
vector_type=nodewriter_pb2.VectorType.DENSE_F32,
vector_dimension=kb_shards.model.vector_dimension,
normalize_vectors=is_matryoshka,
# create new shards
created_shards = []
try:
nodes = cluster_manager.sorted_primary_nodes(ignore_nodes=drain_nodes)
for shard in kb_shards.shards:
shard.ClearField("replicas")
# Attempt to create configured number of replicas
replicas_created = 0
while replicas_created < settings.node_replicas:
if len(nodes) == 0:
# could have multiple shards on single node
nodes = cluster_manager.sorted_primary_nodes(ignore_nodes=drain_nodes)
node_id = nodes.pop(0)

node = get_index_node(node_id)
if node is None:
logger.error(f"Node {node_id} is not found or not available")
continue
is_matryoshka = len(kb_shards.model.matryoshka_dimensions) > 0
vector_index_config = nodewriter_pb2.VectorIndexConfig(
similarity=kb_shards.similarity,
vector_type=nodewriter_pb2.VectorType.DENSE_F32,
vector_dimension=kb_shards.model.vector_dimension,
normalize_vectors=is_matryoshka,
)
try:
shard_created = await node.new_shard(
kbid,
release_channel=kb_shards.release_channel,
vector_index_config=vector_index_config,
)
try:
shard_created = await node.new_shard(
kbid,
release_channel=kb_shards.release_channel,
vector_index_config=vector_index_config,
)
except Exception as e:
errors.capture_exception(e)
logger.exception(f"Error creating new shard at {node}")
continue

replica = writer_pb2.ShardReplica(node=str(node_id))
replica.shard.CopyFrom(shard_created)
shard.replicas.append(replica)
created_shards.append(shard)
replicas_created += 1
except Exception as e:
errors.capture_exception(e)
logger.exception("Unexpected error creating new shard")
for created_shard in created_shards:
await sm.rollback_shard(created_shard)
raise e
except Exception as e:
errors.capture_exception(e)
logger.exception(f"Error creating new shard at {node}")
continue

replica = writer_pb2.ShardReplica(node=str(node_id))
replica.shard.CopyFrom(shard_created)
shard.replicas.append(replica)
created_shards.append(shard)
replicas_created += 1
except Exception as e:
errors.capture_exception(e)
logger.exception("Unexpected error creating new shard")
for created_shard in created_shards:
await sm.rollback_shard(created_shard)
raise e

async with datamanagers.with_transaction() as txn:
await datamanagers.rollover.update_kb_rollover_shards(txn, kbid=kbid, kb_shards=kb_shards)
await txn.commit()
return kb_shards
Expand Down
3 changes: 2 additions & 1 deletion nucliadb/src/nucliadb/ingest/orm/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,8 @@ async def index_resource(
if shard is None:
# It's a new resource, get current active shard to place
# new resource on
shard = await self.shard_manager.get_current_active_shard(txn, kbid)
async with datamanagers.with_ro_transaction() as ro_txn:
shard = await self.shard_manager.get_current_active_shard(ro_txn, kbid)
if shard is None:
# no shard available, create a new one
shard = await self.shard_manager.create_shard_by_kbid(txn, kbid)
Expand Down
2 changes: 1 addition & 1 deletion nucliadb/src/nucliadb/writer/back_pressure.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ async def get_nats_consumer_pending_messages(


async def get_kb_active_shard(context: ApplicationContext, kbid: str) -> Optional[ShardObject]:
async with context.kv_driver.transaction() as txn:
async with context.kv_driver.transaction(read_only=True) as txn:
return await context.shard_manager.get_current_active_shard(txn, kbid)


Expand Down

2 comments on commit c6c2d61

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: c6c2d61 Previous: 0d03d9f Ratio
tests/search/unit/search/test_fetch.py::test_highligh_error 2275.689637529642 iter/sec (stddev: 0.000005668304907336078) 2841.0684406726436 iter/sec (stddev: 0.000004954958228416619) 1.25

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: c6c2d61 Previous: 0d03d9f Ratio
tests/search/unit/search/test_fetch.py::test_highligh_error 2250.6492013779375 iter/sec (stddev: 0.0000029569565448162155) 2841.0684406726436 iter/sec (stddev: 0.000004954958228416619) 1.26

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.