Skip to content

Commit

Permalink
Bug fix
Browse files Browse the repository at this point in the history
Signed-off-by: GilboaAWS <gilboabg@amazon.com>
  • Loading branch information
GilboaAWS committed Jan 6, 2025
1 parent 30b35f2 commit b2caf01
Showing 1 changed file with 20 additions and 16 deletions.
36 changes: 20 additions & 16 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1454,20 +1454,21 @@ where
.get_mut(&address_clone_for_task)
{
refresh_state.node_conn = Some(node);
}
connection_container
.refresh_addresses_done
.insert(address_clone_for_task);
Ok(())
};
}
Err(err) => {
warn!(
"Failed to refresh connection for node {}. Error: `{:?}`",
address_clone_for_task, err
);
Err(err)
}
}
inner_clone
.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.refresh_addresses_done
.insert(address_clone_for_task);
}
.await;

Expand All @@ -1487,7 +1488,7 @@ where
},
);
}
debug!("refresh connection tasts initiated");
debug!("refresh connection tasks initiated");
}

async fn aggregate_results(
Expand Down Expand Up @@ -2338,7 +2339,7 @@ where
)
.await;
return Err(RedisError::from((
ErrorKind::AllConnectionsUnavailable,
ErrorKind::ConnectionNotFoundForRoute,
"No connection for the address, started a refresh task",
)));
}
Expand Down Expand Up @@ -2442,6 +2443,13 @@ where
loop {
let connections_container = self.inner.conn_lock.read().expect(MUTEX_WRITE_ERR);

// Check if both sets are empty
if connections_container.refresh_addresses_started.is_empty()
&& connections_container.refresh_addresses_done.is_empty()
{
break;
}

// Process refresh_addresses_started
let addresses_to_remove: Vec<String> = connections_container
.refresh_addresses_started
Expand Down Expand Up @@ -2483,14 +2491,10 @@ where
}
}
// Remove this entry from refresh_ops_map
connections_container.refresh_operations.remove(&address);
}

// Check if both sets are empty
if connections_container.refresh_addresses_started.is_empty()
&& connections_container.refresh_addresses_done.is_empty()
{
break;
match connections_container.refresh_operations.remove(&address) {
Some(_) => (),
None => warn!("update_refreshed_connection: No refresh operation found to remove for address: {:?}", address),
}
}

// Release the lock before the next iteration
Expand Down

0 comments on commit b2caf01

Please sign in to comment.