Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update actionGet to SuspendUntil for ClusterMetrics #1067

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.opensearch.action.admin.indices.recovery.RecoveryResponse
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest
import org.opensearch.client.Client
Expand All @@ -42,49 +43,50 @@ import kotlin.collections.HashMap
* @param client The [Client] used to call the respective transport action.
* @throws IllegalArgumentException When the requested API is not supported by this feature.
*/
fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
suspend fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
val request = resolveToActionRequest(clusterMetricsInput)
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricsInput.ClusterMetricType.CAT_INDICES -> {
request as CatIndicesRequestWrapper
val healthResponse: ClusterHealthResponse =
client.admin().cluster().health(request.clusterHealthRequest).get()
val healthResponse: ClusterHealthResponse = client.suspendUntil { admin().cluster().health(request.clusterHealthRequest, it) }
val indexSettingsResponse: GetSettingsResponse =
client.admin().indices().getSettings(request.indexSettingsRequest).get()
client.suspendUntil { admin().indices().getSettings(request.indexSettingsRequest, it) }
val indicesResponse: IndicesStatsResponse =
client.admin().indices().stats(request.indicesStatsRequest).get()
client.suspendUntil { admin().indices().stats(request.indicesStatsRequest, it) }
val stateResponse: ClusterStateResponse =
client.admin().cluster().state(request.clusterStateRequest).get()
client.suspendUntil { admin().cluster().state(request.clusterStateRequest, it) }
return CatIndicesResponseWrapper(healthResponse, stateResponse, indexSettingsResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS ->
client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest).get()
client.suspendUntil<Client, PendingClusterTasksResponse> {
admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest, it)
}
ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY ->
client.admin().indices().recoveries(request as RecoveryRequest).get()
client.suspendUntil<Client, RecoveryResponse> { admin().indices().recoveries(request as RecoveryRequest, it) }
ClusterMetricsInput.ClusterMetricType.CAT_SHARDS -> {
request as CatShardsRequestWrapper
val stateResponse: ClusterStateResponse =
client.admin().cluster().state(request.clusterStateRequest).get()
client.suspendUntil { admin().cluster().state(request.clusterStateRequest, it) }
val indicesResponse: IndicesStatsResponse =
client.admin().indices().stats(request.indicesStatsRequest).get()
client.suspendUntil { admin().indices().stats(request.indicesStatsRequest, it) }
return CatShardsResponseWrapper(stateResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS ->
client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get()
client.suspendUntil<Client, GetSnapshotsResponse> { admin().cluster().getSnapshots(request as GetSnapshotsRequest, it) }
ClusterMetricsInput.ClusterMetricType.CAT_TASKS ->
client.admin().cluster().listTasks(request as ListTasksRequest).get()
client.suspendUntil<Client, ListTasksResponse> { admin().cluster().listTasks(request as ListTasksRequest, it) }
ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH ->
client.admin().cluster().health(request as ClusterHealthRequest).get()
client.suspendUntil<Client, ClusterHealthResponse> { admin().cluster().health(request as ClusterHealthRequest, it) }
ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS -> {
val stateResponse: ClusterStateResponse =
client.admin().cluster().state(request as ClusterStateRequest).get()
client.suspendUntil { admin().cluster().state(request as ClusterStateRequest, it) }
val metadata: Metadata = stateResponse.state.metadata
return ClusterGetSettingsResponse(metadata.persistentSettings(), metadata.transientSettings(), Settings.EMPTY)
}
ClusterMetricsInput.ClusterMetricType.CLUSTER_STATS ->
client.admin().cluster().clusterStats(request as ClusterStatsRequest).get()
client.suspendUntil<Client, ClusterStatsResponse> { admin().cluster().clusterStats(request as ClusterStatsRequest, it) }
ClusterMetricsInput.ClusterMetricType.NODES_STATS ->
client.admin().cluster().nodesStats(request as NodesStatsRequest).get()
client.suspendUntil<Client, NodesStatsResponse> { admin().cluster().nodesStats(request as NodesStatsRequest, it) }
else -> throw IllegalArgumentException("Unsupported API request type: ${request.javaClass.name}")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() {
assertThrows(IllegalArgumentException::class.java) { CatIndicesRequestWrapper(pathParams = pathParams) }
}

fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() {
suspend fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down Expand Up @@ -125,7 +125,7 @@ class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() {
}
}

fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() {
suspend fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() {
assertThrows(IllegalArgumentException::class.java) { CatShardsRequestWrapper(pathParams = pathParams) }
}

fun `test CatShardsResponseWrapper returns with only indices in pathParams`() {
suspend fun `test CatShardsResponseWrapper returns with only indices in pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down Expand Up @@ -117,7 +117,7 @@ class CatShardsWrappersIT : OpenSearchSingleNodeTestCase() {
}
}

fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() {
suspend fun `test CatShardsResponseWrapper returns with all indices when empty pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
Expand Down