Skip to content

Commit

Permalink
[DPE-4575] Manage voting exclusions in more detail (#405)
Browse files Browse the repository at this point in the history
Currently, our charm is very simplified when it comes to managing voting
exclusions. The main issue is that we have a limited number of possible
votes to exclude at a time, set by
`cluster.max_voting_config_exclusions` (default: `10`). Therefore, we
need to be cautious on cleaning up this list periodically.

This PR does the following changes:
* Adds a method to fetch for the current voting exclusions
* Decides which votes should stay, based on a current list of votings to
be deleted
* Move the `delete_current()` to be executed at `_start_opensearch`
(already there, in `_post_start_init`)
* Add integration test to scale from 3->1->3 and check for the exclusion
rules
* Continuous writes now set non balanced indices with "0-all" replicas
  • Loading branch information
phvalguima authored Sep 24, 2024
1 parent 7614a18 commit 1f5b9aa
Show file tree
Hide file tree
Showing 12 changed files with 740 additions and 107 deletions.
4 changes: 3 additions & 1 deletion lib/charms/opensearch/v0/helper_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ def nodes(
host: Optional[str] = None # defaults to current unit ip
alt_hosts: Optional[List[str]] = hosts
if not use_localhost and hosts:
host, alt_hosts = hosts[0], hosts[1:]
host = hosts[0]
if len(hosts) >= 2:
alt_hosts = hosts[1:]

nodes: List[Node] = []
if use_localhost or host:
Expand Down
38 changes: 23 additions & 15 deletions lib/charms/opensearch/v0/opensearch_base_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,22 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent):
else:
event.defer()

if not self.unit.is_leader():
return

# Now, we register in the leader application the presence of departing unit's name
# We need to save them as we have a count limit
if (
not (deployment_desc := self.opensearch_peer_cm.deployment_desc())
or not event.departing_unit
):
# No deployment description present
# that happens in the very last stages of the application removal
return
self.opensearch_exclusions.add_to_cleanup_list(
unit_name=format_unit_name(event.departing_unit.name, deployment_desc.app)
)

def _on_opensearch_data_storage_detaching(self, _: StorageDetachingEvent): # noqa: C901
"""Triggered when removing unit, Prior to the storage being detached."""
if self.upgrade_in_progress:
Expand Down Expand Up @@ -555,6 +571,9 @@ def _on_opensearch_data_storage_detaching(self, _: StorageDetachingEvent): # no
pass
try:
self._stop_opensearch()
if self.alt_hosts:
# There is enough peers available for us to try removing the unit
self.opensearch_exclusions.delete_current()

# safeguards in case planned_units > 0
if self.app.planned_units() > 0:
Expand Down Expand Up @@ -596,9 +615,9 @@ def _on_update_status(self, event: UpdateStatusEvent): # noqa: C901
self._add_cm_addresses_to_conf()

# if there are exclusions to be removed
# each unit should check its own exclusions' list
self.opensearch_exclusions.cleanup()
if self.unit.is_leader():
self.opensearch_exclusions.cleanup()

if (health := self.health.apply(wait_for_green_first=True)) not in [
HealthColors.GREEN,
HealthColors.IGNORE,
Expand Down Expand Up @@ -1159,7 +1178,7 @@ def _post_start_init(self, event: _StartOpenSearch): # noqa: C901
else:
self.tls.store_admin_tls_secrets_if_applies()

def _stop_opensearch(self, *, restart=False) -> None:
def _stop_opensearch(self, *, restart: bool = False) -> None:
"""Stop OpenSearch if possible."""
self.status.set(WaitingStatus(ServiceIsStopping))

Expand All @@ -1171,7 +1190,7 @@ def _stop_opensearch(self, *, restart=False) -> None:
# and re-using storage
if len(nodes) > 1:
# 1. Add current node to the voting + alloc exclusions
self.opensearch_exclusions.add_current(restart=restart)
self.opensearch_exclusions.add_current(voting=True, allocation=not restart)
except OpenSearchHttpError:
logger.debug("Failed to get online nodes, voting and alloc exclusions not added")

Expand All @@ -1183,17 +1202,6 @@ def _stop_opensearch(self, *, restart=False) -> None:
self.peers_data.delete(Scope.UNIT, "started")
self.status.set(WaitingStatus(ServiceStopped))

# 3. Remove the exclusions
if not restart:
try:
self.opensearch_exclusions.delete_current()
except Exception:
# It is purposefully broad - as this can fail for HTTP reasons,
# or if the config wasn't set on disk etc. In any way, this operation is on
# a best attempt basis, as this is called upon start as well,
# failure is not blocking at this point of the lifecycle
pass

def _restart_opensearch(self, event: _RestartOpenSearch) -> None:
"""Restart OpenSearch if possible."""
if not self.node_lock.acquired:
Expand Down
224 changes: 188 additions & 36 deletions lib/charms/opensearch/v0/opensearch_nodes_exclusions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from functools import cached_property
from typing import List, Optional, Set

from charms.opensearch.v0.models import Node
from charms.opensearch.v0.helper_charm import all_units, format_unit_name
from charms.opensearch.v0.models import Node, PeerClusterApp
from charms.opensearch.v0.opensearch_exceptions import (
OpenSearchError,
OpenSearchHttpError,
Expand Down Expand Up @@ -41,76 +42,227 @@ class OpenSearchExclusions:
def __init__(self, charm):
self._charm = charm
self._opensearch = self._charm.opensearch

self._scope = Scope.APP if self._charm.unit.is_leader() else Scope.UNIT

def add_current(self, restart: bool = False) -> None:
"""Add Voting and alloc exclusions."""
if (self._node.is_cm_eligible() or self._node.is_voting_only()) and not self._add_voting():
logger.error(f"Failed to add voting exclusion: {self._node.name}.")
def add_to_cleanup_list(self, unit_name: str) -> None:
"""Add Voting and alloc exclusions for a target unit.
This method is just a clean-up-later routine. We (re)add the unit to the exclusions and,
hence, the leader of this app will be aware of this unit's removal and log it into its
app-level peer data.
"""
for lst in [ALLOCS_TO_DELETE, VOTING_TO_DELETE]:
# Load the content of the list, avoiding '' entries
current_set = set(
filter(None, self._charm.peers_data.get(self._scope, lst, "").split(","))
)
current_set = current_set.union({unit_name})

self._charm.peers_data.put(self._scope, lst, ",".join(current_set))

def add_current(
self, voting: bool = True, allocation: bool = True, raise_error: bool = False
) -> None:
"""Add voting and alloc exclusions."""
if voting and (self._node.is_cm_eligible() or self._node.is_voting_only()):
if not self._add_voting():
logger.error(f"Failed to add voting exclusion: {self._node.name}.")
if raise_error:
raise OpenSearchExclusionsException("Failed to add voting exclusion.")

if not restart:
if self._node.is_data() and not self._add_allocations():
if allocation and self._node.is_data():
if not self._add_allocations():
logger.error(f"Failed to add shard allocation exclusion: {self._node.name}.")
if raise_error:
raise OpenSearchExclusionsException("Failed to add allocation exclusion.")

def delete_current(self) -> None:
"""Delete Voting and alloc exclusions."""
if (
self._node.is_cm_eligible() or self._node.is_voting_only()
) and not self._delete_voting():
self._charm.peers_data.put(self._scope, VOTING_TO_DELETE, True)
def delete_current(
self, voting: bool = True, allocation: bool = True, raise_error: bool = False
) -> None:
"""Delete voting and alloc exclusions."""
if voting and (self._node.is_cm_eligible() or self._node.is_voting_only()):
if not self._delete_voting({self._node.name}):
logger.error(f"Failed to delete voting exclusion: {self._node.name}.")
if raise_error:
raise OpenSearchExclusionsException("Failed to delete voting exclusion.")

if self._node.is_data() and not self._delete_allocations():
current_allocations = set(
self._charm.peers_data.get(self._scope, ALLOCS_TO_DELETE, "").split(",")
)
current_allocations.add(self._node.name)
if allocation and self._node.is_data():
if not self._delete_allocations():
logger.error(f"Failed to delete shard allocation exclusion: {self._node.name}.")
# Load the content of the list, avoiding '' entries
current_allocations = set(
filter(
None,
self._charm.peers_data.get(self._scope, ALLOCS_TO_DELETE, "").split(","),
)
)
current_allocations.add(self._node.name)

self._charm.peers_data.put(
self._scope, ALLOCS_TO_DELETE, ",".join(current_allocations)
)
self._charm.peers_data.put(
self._scope, ALLOCS_TO_DELETE, ",".join(current_allocations)
)
if raise_error:
raise OpenSearchExclusionsException("Failed to delete allocation exclusion.")

def cleanup(self) -> None:
"""Delete all exclusions that failed to be deleted."""
need_voting_cleanup = self._charm.peers_data.get(self._scope, VOTING_TO_DELETE, False)
if need_voting_cleanup and self._delete_voting():
self._charm.peers_data.delete(self._scope, VOTING_TO_DELETE)
self._delete_voting(
self._units_to_cleanup(
list(
filter(
None,
self._charm.peers_data.get(self._scope, VOTING_TO_DELETE, "").split(","),
)
)
)
)

allocations_to_cleanup = self._charm.peers_data.get(
self._scope, ALLOCS_TO_DELETE, ""
).split(",")
allocations_to_cleanup = list(
filter(None, self._charm.peers_data.get(self._scope, ALLOCS_TO_DELETE, "").split(","))
)
if allocations_to_cleanup and self._delete_allocations(allocations_to_cleanup):
self._charm.peers_data.delete(self._scope, ALLOCS_TO_DELETE)

def _add_voting(self) -> bool:
def _units_to_cleanup(self, removable: List[str]) -> Optional[Set[str]]:
"""Deletes all units that have left the cluster via Juju.
This method ensures we keep a small list of voting exclusions at all times.
"""
if (
not (deployment_desc := self._charm.opensearch_peer_cm.deployment_desc())
or not removable
):
return set()

if self._charm.opensearch_peer_cm.is_provider(typ="main") and (
apps_in_fleet := self._charm.peers_data.get_object(Scope.APP, "cluster_fleet_apps")
):
apps_in_fleet = [PeerClusterApp.from_dict(app) for app in apps_in_fleet.values()]
units = {
format_unit_name(u, p_cluster_app.app)
for p_cluster_app in apps_in_fleet
for u in p_cluster_app.units
}
else:
units = {format_unit_name(u, deployment_desc.app) for u in all_units(self._charm)}

# Now, we need to remove the units that were marked for deletion and are not in the
# cluster anymore.
to_remove = []
for node in removable:
if node not in units:
# Unit still exists
to_remove.append(node)
return set(to_remove)

def _get_voting_to_delete(self) -> Set[str]:
"""Return the list of voting exclusions to delete."""
return set(
filter(
None,
self._charm.peers_data.get(self._scope, VOTING_TO_DELETE, "").split(","),
)
)

def _add_voting(self, exclusions: Optional[Set[str]] = None) -> bool:
"""Include the current node in the CMs voting exclusions list of nodes."""
try:
self._opensearch.request(
to_add = exclusions or {self._node.name}
response = self._opensearch.request(
"POST",
f"/_cluster/voting_config_exclusions?node_names={self._node.name}&timeout=1m",
f"/_cluster/voting_config_exclusions?node_names={','.join(to_add)}&timeout=1m",
alt_hosts=self._charm.alt_hosts,
resp_status_code=True,
retries=3,
)
return True
logger.debug("Added voting, response: %s", response)

self._charm.peers_data.put(
self._scope,
VOTING_TO_DELETE,
",".join(to_add.union(self._get_voting_to_delete())),
)
# The voting excl. API returns a status only
return response < 400
except OpenSearchHttpError:
return False

def _delete_voting(self) -> bool:
"""Remove all the voting exclusions - cannot target 1 exclusion at a time."""
def _delete_voting(self, exclusions: Set[str]) -> bool:
"""Remove all voting exclusions and then re-adds the subset that should stay.
The API does not allow to remove a subset of the voting exclusions, at once.
"""
current_excl = self._fetch_voting()
logger.debug("Current voting exclusions: %s", current_excl)
if not current_excl:
to_stay = None
else:
to_stay = current_excl - exclusions
if current_excl == to_stay:
# Nothing to do
logger.debug("No voting exclusions to delete, current set is %s", to_stay)
return True

# "wait_for_removal" is VERY important, it removes all voting configs immediately
# and allows any node to return to the voting config in the future
try:
self._opensearch.request(
response = self._opensearch.request(
"DELETE",
"/_cluster/voting_config_exclusions?wait_for_removal=false",
alt_hosts=self._charm.alt_hosts,
resp_status_code=True,
)
return True
if response >= 400:
logger.debug("Failed to remove voting exclusions, response %s", response)
return False

logger.debug("Removed voting for: %s", exclusions)
if to_stay:
# Now, we register the units that should stay
response = self._opensearch.request(
"POST",
f"/_cluster/voting_config_exclusions?node_names={','.join(sorted(to_stay))}&timeout=1m",
alt_hosts=self._charm.alt_hosts,
resp_status_code=True,
retries=3,
)
if response >= 400:
logger.debug("Failed to remove voting exclusions, response %s", response)
return False

# Finally, we clean up the VOTING_TO_DELETE
self._charm.peers_data.put(
self._scope,
VOTING_TO_DELETE,
",".join(self._get_voting_to_delete() - exclusions),
)
return response < 400
except OpenSearchHttpError:
return False

def _fetch_voting(self) -> Set[str]:
"""Fetch the registered voting exclusions."""
try:
resp = self._opensearch.request(
"GET",
"/_cluster/state/metadata/voting_config_exclusions",
alt_hosts=self._charm.alt_hosts,
)
return set(
sorted(
[
node["node_name"]
for node in resp["metadata"]["cluster_coordination"][
"voting_config_exclusions"
]
]
)
)
except (OpenSearchHttpError, KeyError) as e:
logger.warning(f"Failed to fetch voting exclusions: {e}")
# no voting exclusion set
return {}

def _add_allocations(
self, allocations: Optional[Set[str]] = None, override: bool = False
) -> bool:
Expand Down
13 changes: 11 additions & 2 deletions tests/integration/ha/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pytest_operator.plugin import OpsTest

from ..helpers import APP_NAME, get_application_unit_ids
from .continuous_writes import ContinuousWrites
from .continuous_writes import ContinuousWrites, ReplicationMode
from .helpers import ORIGINAL_RESTART_DELAY, app_name, update_restart_delay

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -39,10 +39,19 @@ async def c_writes_runner(ops_test: OpsTest, c_writes: ContinuousWrites):
logger.info("\n\n\n\nThe writes have been cleared.\n\n\n\n")


@pytest.fixture(scope="function")
async def c_0_repl_writes_runner(ops_test: OpsTest, c_writes: ContinuousWrites):
"""Starts continuous write operations and clears writes at the end of the test."""
await c_writes.start(repl_mode=ReplicationMode.WITH_AT_LEAST_0_REPL)
yield
await c_writes.clear()
logger.info("\n\n\n\nThe writes have been cleared.\n\n\n\n")


@pytest.fixture(scope="function")
async def c_balanced_writes_runner(ops_test: OpsTest, c_writes: ContinuousWrites):
"""Same as previous runner, but starts continuous writes on cluster wide replicated index."""
await c_writes.start(repl_on_all_nodes=True)
await c_writes.start(repl_mode=ReplicationMode.WITH_AT_LEAST_1_REPL)
yield
await c_writes.clear()
logger.info("\n\n\n\nThe writes have been cleared.\n\n\n\n")
Loading

0 comments on commit 1f5b9aa

Please sign in to comment.