Skip to content

Commit

Permalink
Add cluster/statistics endpoint handling.
Browse files Browse the repository at this point in the history
This commit adds a new cluster method get_cluster_statistics()
which retrieves valuable statistics from the cluster.
  • Loading branch information
jfrancoa committed Nov 27, 2024
1 parent 7f36591 commit 0f880d8
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 6 deletions.
7 changes: 3 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ repos:
- id: autoflake
args: [--in-place, --remove-all-unused-imports, --exclude=weaviate/proto/*]


- repo: https://github.com/PyCQA/flake8
rev: 7.1.0
hooks:
Expand All @@ -36,13 +35,13 @@ repos:
]
files: '^weaviate/collections'

- repo: local
hooks:
- repo: local
hooks:
- id: mypy
name: mypy
entry: ./run-mypy.sh
language: python
language_version: "3.11"
language_version: "3.12"
# use require_serial so that script
# is only called once per commit
require_serial: true
Expand Down
63 changes: 63 additions & 0 deletions integration_v3/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,66 @@ def test_get_nodes_status_with_data(client: weaviate.Client):
assert shards[0]["class"] == class_name1
assert shards[0]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT
assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT


def test_get_cluster_statistics(client: weaviate.Client):
"""Test getting cluster statistics."""
stats = client.cluster.get_cluster_statistics()

# Check top level structure
assert "statistics" in stats
assert "synchronized" in stats
assert isinstance(stats["synchronized"], bool)

# Check statistics array
assert isinstance(stats["statistics"], list)
assert len(stats["statistics"]) >= 1 # At least one node

# Check first node's statistics
node = stats["statistics"][0]
# bootstrapped is optional
if "bootstrapped" in node:
assert isinstance(node["bootstrapped"], bool)
assert isinstance(node["candidates"], dict)
# Check candidates structure if not empty
if node["candidates"]:
for node_name, address in node["candidates"].items():
assert isinstance(node_name, str)
assert isinstance(address, str)
assert ":" in address # Address should be in format IP:PORT
assert isinstance(node["dbLoaded"], bool)
assert isinstance(node["isVoter"], bool)
assert isinstance(node["leaderAddress"], str)
assert isinstance(node["leaderId"], str)
assert isinstance(node["name"], str)
assert isinstance(node["open"], bool) # API returns 'open', not 'open_'
assert isinstance(node["ready"], bool)
assert isinstance(node["status"], str)

# Check Raft statistics
raft = node["raft"]
assert isinstance(raft["appliedIndex"], str)
assert isinstance(raft["commitIndex"], str)
assert isinstance(raft["fsmPending"], str)
assert isinstance(raft["lastContact"], str)
assert isinstance(raft["lastLogIndex"], str)
assert isinstance(raft["lastLogTerm"], str)
assert isinstance(raft["lastSnapshotIndex"], str)
assert isinstance(raft["lastSnapshotTerm"], str)
assert isinstance(raft["latestConfiguration"], list)
assert isinstance(raft["latestConfigurationIndex"], str)
assert isinstance(raft["numPeers"], str)
assert isinstance(raft["protocolVersion"], str)
assert isinstance(raft["protocolVersionMax"], str)
assert isinstance(raft["protocolVersionMin"], str)
assert isinstance(raft["snapshotVersionMax"], str)
assert isinstance(raft["snapshotVersionMin"], str)
assert isinstance(raft["state"], str)
assert isinstance(raft["term"], str)

# Check at least one peer in the configuration
assert len(raft["latestConfiguration"]) >= 1
peer = raft["latestConfiguration"][0]
assert isinstance(peer["address"], str)
assert isinstance(peer["id"], str) # API returns 'id', not 'id_'
assert isinstance(peer["suffrage"], int)
114 changes: 114 additions & 0 deletions test/cluster/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,117 @@ def test_get_nodes_status(self):
result = Cluster(mock_conn).get_nodes_status()
self.assertListEqual(result, expected_resp.get("nodes"))
mock_conn.get.assert_called_with(path="/nodes")

def test_get_cluster_statistics(self):
# error messages
unexpected_err_msg = "Cluster statistics"
empty_response_err_msg = "Cluster statistics response returned empty"
connection_err_msg = "Get cluster statistics failed due to connection error"

# expected failure
mock_conn = mock_connection_func("get", status_code=500)
with self.assertRaises(UnexpectedStatusCodeException) as error:
Cluster(mock_conn).get_cluster_statistics()
check_startswith_error_message(self, error, unexpected_err_msg)

mock_conn = mock_connection_func("get", status_code=200, return_json=None)
with self.assertRaises(EmptyResponseException) as error:
Cluster(mock_conn).get_cluster_statistics()
check_error_message(self, error, empty_response_err_msg)

mock_conn = mock_connection_func("get", side_effect=RequestsConnectionError)
with self.assertRaises(RequestsConnectionError) as error:
Cluster(mock_conn).get_cluster_statistics()
check_error_message(self, error, connection_err_msg)

# expected success
expected_resp = {
"statistics": [
{
"candidates": {
"weaviate-0": "10.244.2.3:8300",
"weaviate-1": "10.244.1.3:8300",
},
"dbLoaded": True,
"isVoter": True,
"leaderAddress": "10.244.3.3:8300",
"leaderId": "weaviate-2",
"name": "weaviate-0",
"open_": True,
"raft": {
"appliedIndex": "3",
"commitIndex": "3",
"fsmPending": "0",
"lastContact": "29.130625ms",
"lastLogIndex": "3",
"lastLogTerm": "2",
"lastSnapshotIndex": "0",
"lastSnapshotTerm": "0",
"latestConfiguration": [
{"address": "10.244.1.3:8300", "id_": "weaviate-1", "suffrage": 0},
{"address": "10.244.3.3:8300", "id_": "weaviate-2", "suffrage": 0},
{"address": "10.244.2.3:8300", "id_": "weaviate-0", "suffrage": 0},
],
"latestConfigurationIndex": "0",
"numPeers": "2",
"protocolVersion": "3",
"protocolVersionMax": "3",
"protocolVersionMin": "0",
"snapshotVersionMax": "1",
"snapshotVersionMin": "0",
"state": "Follower",
"term": "2",
},
"ready": True,
"status": "HEALTHY",
},
{
"bootstrapped": True,
"candidates": {},
"dbLoaded": True,
"isVoter": True,
"leaderAddress": "10.244.3.3:8300",
"leaderId": "weaviate-2",
"name": "weaviate-1",
"open_": True,
"raft": {
"appliedIndex": "3",
"commitIndex": "3",
"fsmPending": "0",
"lastContact": "41.289833ms",
"lastLogIndex": "3",
"lastLogTerm": "2",
"lastSnapshotIndex": "0",
"lastSnapshotTerm": "0",
"latestConfiguration": [
{"address": "10.244.1.3:8300", "id_": "weaviate-1", "suffrage": 0},
{"address": "10.244.3.3:8300", "id_": "weaviate-2", "suffrage": 0},
{"address": "10.244.2.3:8300", "id_": "weaviate-0", "suffrage": 0},
],
"latestConfigurationIndex": "0",
"numPeers": "2",
"protocolVersion": "3",
"protocolVersionMax": "3",
"protocolVersionMin": "0",
"snapshotVersionMax": "1",
"snapshotVersionMin": "0",
"state": "Follower",
"term": "2",
},
"ready": True,
"status": "HEALTHY",
},
],
"synchronized": True,
}
mock_conn = mock_connection_func("get", status_code=200, return_json=expected_resp)
result = Cluster(mock_conn).get_cluster_statistics()

# Convert the response to match our type definitions with renamed fields
for node in result["statistics"]:
node["open_"] = node.pop("open_")
for peer in node["raft"]["latestConfiguration"]:
peer["id_"] = peer.pop("id_")

self.assertEqual(result, expected_resp)
mock_conn.get.assert_called_with(path="/cluster/statistics")
32 changes: 31 additions & 1 deletion weaviate/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from requests.exceptions import ConnectionError as RequestsConnectionError

from weaviate.cluster.types import Node
from weaviate.cluster.types import Node, ClusterStats
from weaviate.connect import Connection
from weaviate.exceptions import (
EmptyResponseException,
Expand Down Expand Up @@ -79,3 +79,33 @@ def get_nodes_status(
if nodes is None or nodes == []:
raise EmptyResponseException("Nodes status response returned empty")
return cast(List[Node], nodes)

def get_cluster_statistics(self) -> ClusterStats:
"""
Get the cluster statistics including Raft consensus information.
Returns
-------
ClusterStats
Statistics about the cluster including Raft consensus information.
Raises
------
requests.ConnectionError
If the network connection to weaviate fails.
weaviate.UnexpectedStatusCodeException
If weaviate reports a none OK status.
weaviate.EmptyResponseException
If the response is empty.
"""
try:
response = self._connection.get(path="/cluster/statistics")
except RequestsConnectionError as conn_err:
raise RequestsConnectionError(
"Get cluster statistics failed due to connection error"
) from conn_err

response_typed = _decode_json_response_dict(response, "Cluster statistics")
if response_typed is None:
raise EmptyResponseException("Cluster statistics response returned empty")
return cast(ClusterStats, response_typed)
49 changes: 48 additions & 1 deletion weaviate/cluster/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Literal, Optional, TypedDict
from typing import List, Literal, Optional, TypedDict, Dict


class BatchStats(TypedDict):
Expand Down Expand Up @@ -34,3 +34,50 @@ class Node(TypedDict):
stats: Stats
status: str
version: str


class RaftPeer(TypedDict):
address: str
id_: str
suffrage: int


class RaftStats(TypedDict):
appliedIndex: str
commitIndex: str
fsmPending: str
lastContact: str
lastLogIndex: str
lastLogTerm: str
lastSnapshotIndex: str
lastSnapshotTerm: str
latestConfiguration: List[RaftPeer]
latestConfigurationIndex: str
numPeers: str
protocolVersion: str
protocolVersionMax: str
protocolVersionMin: str
snapshotVersionMax: str
snapshotVersionMin: str
state: str
term: str


# total=False is used to make handle some of the optional fields
class ClusterNodeStats(TypedDict, total=False):
bootstrapped: bool
candidates: Dict[str, str]
dbLoaded: bool
isVoter: bool
leaderAddress: str
leaderId: str
name: str
open_: bool
raft: RaftStats
ready: bool
status: str


class ClusterStats(TypedDict):
statistics: List[ClusterNodeStats]
synchronized: bool

0 comments on commit 0f880d8

Please sign in to comment.