Skip to content

Commit

Permalink
calculate metrics based on list size, add stronger assertions to tests
Browse files Browse the repository at this point in the history
Signed-off-by: Rahul Karajgikar <karajgik@amazon.com>
  • Loading branch information
Rahul Karajgikar committed Sep 25, 2024
1 parent 8d74b36 commit bd80ae6
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
);
asyncFetchSuccessCounter = metricsRegistry.createCounter(
"async.fetch.success.count",
"Counter for total number of async fetches",
"Counter for number of successful async fetches",
COUNTER_METRICS_UNIT
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ Map<DiscoveryNode, K> getCacheData(DiscoveryNodes nodes, Set<String> failedNodes
}

void processResponses(List<K> responses, long fetchingRound) {
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchSuccessCounter, Double.valueOf(responses.size()));
for (K response : responses) {
BaseNodeEntry nodeEntry = getCache().get(response.getNode().getId());
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchSuccessCounter, 1.0);
if (nodeEntry != null) {
if (validateNodeResponse(nodeEntry, fetchingRound)) {
// if the entry is there, for the right fetching round and not marked as failed already, process it
Expand Down Expand Up @@ -226,9 +226,9 @@ boolean retryableException(Throwable unwrappedCause) {
}

void processFailures(List<FailedNodeException> failures, long fetchingRound) {
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchFailureCounter, Double.valueOf(failures.size()));
for (FailedNodeException failure : failures) {
logger.trace("processing failure {} for [{}]", failure, type);
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchFailureCounter, 1.0);
BaseNodeEntry nodeEntry = getCache().get(failure.nodeId());
if (nodeEntry != null) {
handleNodeFailure(nodeEntry, failure, fetchingRound);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public void testClose() throws Exception {
// fire a response, wait on reroute incrementing
test.fireSimulationAndWait(node1.getId());
// counter goes up because fetch completed
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

Expand All @@ -141,6 +142,7 @@ public void testClose() throws Exception {
try {
test.fetchData(nodes, emptyMap());
// counter should not go up when calling fetchData since fetch never completed
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());
fail("fetch data should fail when closed");
Expand All @@ -161,6 +163,7 @@ public void testFullCircleSingleNodeSuccess() throws Exception {
// fire a response, wait on reroute incrementing
test.fireSimulationAndWait(node1.getId());
// total counter goes up by 1 after success
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

Expand All @@ -171,6 +174,7 @@ public void testFullCircleSingleNodeSuccess() throws Exception {
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
// counter remains same because fetchData does not trigger new async fetch
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());
}
Expand All @@ -188,6 +192,7 @@ public void testFullCircleSingleNodeFailure() throws Exception {
// fire a response, wait on reroute incrementing
test.fireSimulationAndWait(node1.getId());
verify(asyncFetchSuccessCounter, times(0)).add(anyDouble());
verify(asyncFetchFailureCounter, times(1)).add(1.0);
verify(asyncFetchFailureCounter, times(1)).add(anyDouble());

// failure, fetched data exists, but has no data
Expand All @@ -197,6 +202,7 @@ public void testFullCircleSingleNodeFailure() throws Exception {
assertThat(fetchData.getData().size(), equalTo(0));
// counter remains same because fetchData does not trigger new async fetch
verify(asyncFetchSuccessCounter, times(0)).add(anyDouble());
verify(asyncFetchFailureCounter, times(1)).add(1.0);
verify(asyncFetchFailureCounter, times(1)).add(anyDouble());

// on failure, we reset the failure on a successive call to fetchData, and try again afterwards
Expand All @@ -205,11 +211,14 @@ public void testFullCircleSingleNodeFailure() throws Exception {
assertThat(fetchData.hasData(), equalTo(false));
// No additional failure, empty data so no change in counter
verify(asyncFetchSuccessCounter, times(0)).add(anyDouble());
verify(asyncFetchFailureCounter, times(1)).add(1.0);
verify(asyncFetchFailureCounter, times(1)).add(anyDouble());

test.fireSimulationAndWait(node1.getId());
// Success counter will increase
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(1)).add(1.0);
verify(asyncFetchFailureCounter, times(1)).add(anyDouble());

// 2 reroutes, cause we have a failure that we clear
Expand All @@ -219,7 +228,9 @@ public void testFullCircleSingleNodeFailure() throws Exception {
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
// counter remains same because fetchData does not trigger new async fetch
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(1)).add(1.0);
verify(asyncFetchFailureCounter, times(1)).add(anyDouble());
}

Expand All @@ -239,15 +250,20 @@ public void testIgnoreResponseFromDifferentRound() throws Exception {
test.processAsyncFetch(Collections.singletonList(response1), Collections.emptyList(), 0);
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(1));
// counter increments to 1 because we called processAsyncFetch with a valid response, even though the round was incorrect
// success counter increments to 1 because we called processAsyncFetch with a valid response, even though the round was incorrect
// failure counter also increments by 1 with empty list
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());
verify(asyncFetchFailureCounter, times(1)).add(0.0);
verify(asyncFetchFailureCounter, times(1)).add(anyDouble());

// fire a response (with correct round id), wait on reroute incrementing
test.fireSimulationAndWait(node1.getId());
// total counter now goes up by 1 because fetchData completed
// success counter now goes up by 1 because fetchData completed
verify(asyncFetchSuccessCounter, times(2)).add(1.0);
verify(asyncFetchSuccessCounter, times(2)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());
verify(asyncFetchFailureCounter, times(1)).add(0.0);
verify(asyncFetchFailureCounter, times(1)).add(anyDouble());

// verify we get back the data node
assertThat(test.reroute.get(), equalTo(2));
Expand All @@ -256,8 +272,10 @@ public void testIgnoreResponseFromDifferentRound() throws Exception {
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
// total counter remains same because fetchdata does not trigger new async fetch
verify(asyncFetchSuccessCounter, times(2)).add(1.0);
verify(asyncFetchSuccessCounter, times(2)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());
verify(asyncFetchFailureCounter, times(1)).add(0.0);
verify(asyncFetchFailureCounter, times(1)).add(anyDouble());
}

public void testIgnoreFailureFromDifferentRound() throws Exception {
Expand All @@ -281,22 +299,29 @@ public void testIgnoreFailureFromDifferentRound() throws Exception {
);
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(1));
// success counter called with empty list
// failure counter goes up by 1 because of the failure
verify(asyncFetchSuccessCounter, times(0)).add(anyDouble());
verify(asyncFetchSuccessCounter, times(1)).add(0.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(1)).add(1.0);
verify(asyncFetchFailureCounter, times(1)).add(anyDouble());

// fire a response, wait on reroute incrementing
test.fireSimulationAndWait(node1.getId());
// failure counter goes up by 1 because of the failure
verify(asyncFetchSuccessCounter, times(0)).add(anyDouble());
verify(asyncFetchSuccessCounter, times(1)).add(0.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(2)).add(1.0);
verify(asyncFetchFailureCounter, times(2)).add(anyDouble());
// failure, fetched data exists, but has no data
assertThat(test.reroute.get(), equalTo(2));
fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(0));
// counters remain same because fetchData does not trigger new async fetch
verify(asyncFetchSuccessCounter, times(0)).add(anyDouble());
verify(asyncFetchSuccessCounter, times(1)).add(0.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(2)).add(1.0);
verify(asyncFetchFailureCounter, times(2)).add(anyDouble());
}

Expand All @@ -316,6 +341,7 @@ public void testTwoNodesOnSetup() throws Exception {
// fire the first response, it should trigger a reroute
test.fireSimulationAndWait(node1.getId());
// counter 1 because one fetch completed
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

Expand All @@ -324,12 +350,14 @@ public void testTwoNodesOnSetup() throws Exception {
fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(false));
// counter still 1 because fetchData did not trigger new async fetch
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

// fire the second simulation, this should allow us to get the data
test.fireSimulationAndWait(node2.getId());
// counter 2 because 2 fetches completed
verify(asyncFetchSuccessCounter, times(2)).add(1.0);
verify(asyncFetchSuccessCounter, times(2)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());
// no more ongoing requests, we should fetch the data
Expand All @@ -340,6 +368,7 @@ public void testTwoNodesOnSetup() throws Exception {
assertThat(fetchData.getData().get(node1), sameInstance(response1));
assertThat(fetchData.getData().get(node2), sameInstance(response2));
// counter still 2 because fetchData call did not trigger new async fetch
verify(asyncFetchSuccessCounter, times(2)).add(1.0);
verify(asyncFetchSuccessCounter, times(2)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());
}
Expand All @@ -363,13 +392,16 @@ public void testTwoNodesOnSetupAndFailure() throws Exception {
fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(false));
// counter 1 because one fetch completed
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

// fire the second simulation, this should allow us to get the data
test.fireSimulationAndWait(node2.getId());
// failure counter up by 1 because one fetch failed
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(1)).add(1.0);
verify(asyncFetchFailureCounter, times(1)).add(anyDouble());
assertThat(test.reroute.get(), equalTo(2));

Expand All @@ -379,7 +411,9 @@ public void testTwoNodesOnSetupAndFailure() throws Exception {
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
// success and failure counters same because fetchData did not trigger new async fetch
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(1)).add(1.0);
verify(asyncFetchFailureCounter, times(1)).add(anyDouble());
}

Expand All @@ -398,6 +432,7 @@ public void testTwoNodesAddedInBetween() throws Exception {
// fire the first response, it should trigger a reroute
test.fireSimulationAndWait(node1.getId());
// counter 1 because fetch completed
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

Expand All @@ -408,12 +443,14 @@ public void testTwoNodesAddedInBetween() throws Exception {
fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(false));
// counter still 1 because second fetch ongoing
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

// fire the second simulation, this should allow us to get the data
test.fireSimulationAndWait(node2.getId());
// counter now 2 because 2 fetches completed
verify(asyncFetchSuccessCounter, times(2)).add(1.0);
verify(asyncFetchSuccessCounter, times(2)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

Expand All @@ -424,6 +461,7 @@ public void testTwoNodesAddedInBetween() throws Exception {
assertThat(fetchData.getData().get(node1), sameInstance(response1));
assertThat(fetchData.getData().get(node2), sameInstance(response2));
// counter still 2 because fetchData did not trigger new async fetch
verify(asyncFetchSuccessCounter, times(2)).add(1.0);
verify(asyncFetchSuccessCounter, times(2)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());
}
Expand All @@ -446,6 +484,7 @@ public void testClearCache() throws Exception {
test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(1));
// counter 1 because 1 fetch completed
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

Expand All @@ -455,6 +494,7 @@ public void testClearCache() throws Exception {
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
// counter still 1 because a new fetch is not called
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

Expand All @@ -464,6 +504,7 @@ public void testClearCache() throws Exception {
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
// counter still 1 because a new fetch is not called
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

Expand All @@ -476,12 +517,14 @@ public void testClearCache() throws Exception {
fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(false));
// counter still 1 because new fetch is still ongoing
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(2));
// counter now 2 because second fetch completed
verify(asyncFetchSuccessCounter, times(2)).add(1.0);
verify(asyncFetchSuccessCounter, times(2)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

Expand All @@ -491,6 +534,7 @@ public void testClearCache() throws Exception {
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1_2));
// counter still 2 because fetchData did not trigger new async fetch
verify(asyncFetchSuccessCounter, times(2)).add(1.0);
verify(asyncFetchSuccessCounter, times(2)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());
}
Expand All @@ -513,6 +557,7 @@ public void testConcurrentRequestAndClearCache() throws Exception {
test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(1));
// counter 1 because fetch completed, even though cache was wiped
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

Expand All @@ -523,12 +568,14 @@ public void testConcurrentRequestAndClearCache() throws Exception {
fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(false));
// counter unchanged because fetch ongoing
verify(asyncFetchSuccessCounter, times(1)).add(1.0);
verify(asyncFetchSuccessCounter, times(1)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(2));
// counter 2 because second fetch completed
verify(asyncFetchSuccessCounter, times(2)).add(1.0);
verify(asyncFetchSuccessCounter, times(2)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

Expand All @@ -538,6 +585,7 @@ public void testConcurrentRequestAndClearCache() throws Exception {
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1_2));
// counter unchanged because fetchData does not trigger new async fetch
verify(asyncFetchSuccessCounter, times(2)).add(1.0);
verify(asyncFetchSuccessCounter, times(2)).add(anyDouble());
verify(asyncFetchFailureCounter, times(0)).add(anyDouble());

Expand Down

0 comments on commit bd80ae6

Please sign in to comment.