Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Explain all and Get all API (#352)
Browse files Browse the repository at this point in the history
* transform explain to be able to get all
use SearchParams
fix sort order problem of cluster state
add enabled field
* differentiate explain and explain all response
* add get all policy API
* remove enable field from metadata
* normalize action name to be similar with other ODFE plugins
* totalPolicies to total_policies
total_managed_indices
bug fix: when query size = 0 still show total managed indices
  • Loading branch information
bowenlan-amzn authored Jan 27, 2021
1 parent 7e52f26 commit 2f7d778
Show file tree
Hide file tree
Showing 35 changed files with 929 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.deletepolicy.TransportDeletePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain.TransportExplainAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPoliciesAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.TransportGetPoliciesAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.TransportGetPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.indexpolicy.IndexPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.indexpolicy.TransportIndexPolicyAction
Expand Down Expand Up @@ -291,6 +293,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
ActionPlugin.ActionHandler(ExplainAction.INSTANCE, TransportExplainAction::class.java),
ActionPlugin.ActionHandler(DeletePolicyAction.INSTANCE, TransportDeletePolicyAction::class.java),
ActionPlugin.ActionHandler(GetPolicyAction.INSTANCE, TransportGetPolicyAction::class.java),
ActionPlugin.ActionHandler(GetPoliciesAction.INSTANCE, TransportGetPoliciesAction::class.java),
ActionPlugin.ActionHandler(DeleteRollupAction.INSTANCE, TransportDeleteRollupAction::class.java),
ActionPlugin.ActionHandler(GetRollupAction.INSTANCE, TransportGetRollupAction::class.java),
ActionPlugin.ActionHandler(GetRollupsAction.INSTANCE, TransportGetRollupsAction::class.java),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class IndexStateManagementHistory(
return response.isRolledOver
}

@Suppress("SpreadOperator", "NestedBlockDepth")
@Suppress("SpreadOperator", "NestedBlockDepth", "ComplexMethod")
private fun deleteOldHistoryIndex() {
val indexToDelete = mutableListOf<String>()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
* Initializes the change policy process where we will get the policy using the change policy's policyID, update the [ManagedIndexMetaData]
* to reflect the new policy, and save the new policy to the [ManagedIndexConfig] while resetting the change policy to null
*/
@Suppress("ReturnCount")
@Suppress("ReturnCount", "ComplexMethod")
private suspend fun initChangePolicy(
managedIndexConfig: ManagedIndexConfig,
managedIndexMetaData: ManagedIndexMetaData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ data class ManagedIndexMetaData(
val info: Map<String, Any>?
) : Writeable, ToXContentFragment {

@Suppress("ComplexMethod")
fun toMap(): Map<String, String> {
val resultMap = mutableMapOf<String, String> ()
resultMap[INDEX] = index
Expand Down Expand Up @@ -116,6 +117,7 @@ data class ManagedIndexMetaData(
}

if (info != null) builder.field(INFO, info)

return builder
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ data class Policy(
}

return Policy(
id,
seqNo,
primaryTerm,
requireNotNull(description) { "$DESCRIPTION_FIELD is null" },
schemaVersion,
lastUpdatedTime ?: Instant.now(),
errorNotification,
requireNotNull(defaultState) { "$DEFAULT_STATE_FIELD is null" },
states.toList()
id = id,
seqNo = seqNo,
primaryTerm = primaryTerm,
description = requireNotNull(description) { "$DESCRIPTION_FIELD is null" },
schemaVersion = schemaVersion,
lastUpdatedTime = lastUpdatedTime ?: Instant.now(),
errorNotification = errorNotification,
defaultState = requireNotNull(defaultState) { "$DEFAULT_STATE_FIELD is null" },
states = states.toList()
)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model

import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.io.stream.Writeable
import java.io.IOException

data class SearchParams(
val size: Int,
val from: Int,
val sortField: String,
val sortOrder: String,
val queryString: String
) : Writeable {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
size = sin.readInt(),
from = sin.readInt(),
sortField = sin.readString(),
sortOrder = sin.readString(),
queryString = sin.readString()
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeInt(size)
out.writeInt(from)
out.writeString(sortField)
out.writeString(sortOrder)
out.writeString(queryString)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ abstract class ActionConfig(
// TODO clean up for actionIndex
@JvmStatic
@Throws(IOException::class)
@Suppress("ComplexMethod")
fun fromStreamInput(sin: StreamInput): ActionConfig {
val type = sin.readEnum(ActionType::class.java)
val actionIndex = sin.readInt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@
package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.ISM_BASE_URI
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.SearchParams
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_JOB_SORT_FIELD
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_PAGINATION_FROM
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_PAGINATION_SIZE
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_QUERY_STRING
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_SORT_ORDER
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.support.master.MasterNodeRequest
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.common.Strings
Expand All @@ -28,6 +35,8 @@ import org.elasticsearch.rest.RestRequest
import org.elasticsearch.rest.RestRequest.Method.GET
import org.elasticsearch.rest.action.RestToXContentListener

private val log = LogManager.getLogger(RestExplainAction::class.java)

class RestExplainAction : BaseRestHandler() {

companion object {
Expand All @@ -47,13 +56,22 @@ class RestExplainAction : BaseRestHandler() {

@Suppress("SpreadOperator") // There is no way around dealing with java vararg without spread operator.
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val indices: Array<String>? = Strings.splitStringByCommaToArray(request.param("index"))
if (indices == null || indices.isEmpty()) {
throw IllegalArgumentException("Missing indices")
}
log.debug("${request.method()} ${request.path()}")

val indices: Array<String> = Strings.splitStringByCommaToArray(request.param("index"))

val explainRequest = ExplainRequest(indices.toList(), request.paramAsBoolean("local", false),
request.paramAsTime("master_timeout", MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT))
val size = request.paramAsInt("size", DEFAULT_PAGINATION_SIZE)
val from = request.paramAsInt("from", DEFAULT_PAGINATION_FROM)
val sortField = request.param("sortField", DEFAULT_JOB_SORT_FIELD)
val sortOrder = request.param("sortOrder", DEFAULT_SORT_ORDER)
val queryString = request.param("queryString", DEFAULT_QUERY_STRING)

val explainRequest = ExplainRequest(
indices.toList(),
request.paramAsBoolean("local", false),
request.paramAsTime("master_timeout", MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT),
SearchParams(size, from, sortField, sortOrder, queryString)
)

return RestChannelConsumer { channel ->
client.execute(ExplainAction.INSTANCE, explainRequest, RestToXContentListener(channel))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,17 @@
package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.POLICY_BASE_URI
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.SearchParams
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPoliciesAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPoliciesRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPolicyRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_PAGINATION_FROM
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_PAGINATION_SIZE
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_POLICY_SORT_FIELD
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_QUERY_STRING
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_SORT_ORDER
import org.apache.logging.log4j.LogManager
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.rest.BaseRestHandler
import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer
Expand All @@ -29,10 +38,13 @@ import org.elasticsearch.rest.action.RestActions
import org.elasticsearch.rest.action.RestToXContentListener
import org.elasticsearch.search.fetch.subphase.FetchSourceContext

private val log = LogManager.getLogger(RestGetPolicyAction::class.java)

class RestGetPolicyAction : BaseRestHandler() {

override fun routes(): List<Route> {
return listOf(
Route(GET, POLICY_BASE_URI),
Route(GET, "$POLICY_BASE_URI/{policyID}"),
Route(HEAD, "$POLICY_BASE_URI/{policyID}")
)
Expand All @@ -43,20 +55,29 @@ class RestGetPolicyAction : BaseRestHandler() {
}

override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val policyId = request.param("policyID")
log.debug("${request.method()} ${request.path()}")

if (policyId == null || policyId.isEmpty()) {
throw IllegalArgumentException("Missing policy ID")
}
val policyId = request.param("policyID")

var fetchSrcContext: FetchSourceContext = FetchSourceContext.FETCH_SOURCE
if (request.method() == HEAD) {
fetchSrcContext = FetchSourceContext.DO_NOT_FETCH_SOURCE
}
val getPolicyRequest = GetPolicyRequest(policyId, RestActions.parseVersion(request), fetchSrcContext)

val size = request.paramAsInt("size", DEFAULT_PAGINATION_SIZE)
val from = request.paramAsInt("from", DEFAULT_PAGINATION_FROM)
val sortField = request.param("sortField", DEFAULT_POLICY_SORT_FIELD)
val sortOrder = request.param("sortOrder", DEFAULT_SORT_ORDER)
val queryString = request.param("queryString", DEFAULT_QUERY_STRING)

return RestChannelConsumer { channel ->
client.execute(GetPolicyAction.INSTANCE, getPolicyRequest, RestToXContentListener(channel))
if (policyId == null || policyId.isEmpty()) {
val getPoliciesRequest = GetPoliciesRequest(SearchParams(size, from, sortField, sortOrder, queryString))
client.execute(GetPoliciesAction.INSTANCE, getPoliciesRequest, RestToXContentListener(channel))
} else {
val getPolicyRequest = GetPolicyRequest(policyId, RestActions.parseVersion(request), fetchSrcContext)
client.execute(GetPolicyAction.INSTANCE, getPolicyRequest, RestToXContentListener(channel))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class AttemptCallForceMergeStep(

override fun isIdempotent() = false

@Suppress("TooGenericExceptionCaught")
@Suppress("TooGenericExceptionCaught", "ComplexMethod")
override suspend fun execute(): AttemptCallForceMergeStep {
try {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class TransportChangePolicyAction @Inject constructor(
ChangePolicyHandler(client, listener, request).start()
}

@Suppress("TooManyFunctions")
inner class ChangePolicyHandler(
private val client: NodeClient,
private val actionListener: ActionListener<ISMStatusResponse>,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
import java.io.IOException

class ExplainAllResponse : ExplainResponse, ToXContentObject {

val totalManagedIndices: Int
val enabledState: Map<String, Boolean>

constructor(
indexNames: List<String>,
indexPolicyIDs: List<String?>,
indexMetadatas: List<ManagedIndexMetaData?>,
totalManagedIndices: Int,
enabledState: Map<String, Boolean>
) : super(indexNames, indexPolicyIDs, indexMetadatas) {
this.totalManagedIndices = totalManagedIndices
this.enabledState = enabledState
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
indexNames = sin.readStringList(),
indexPolicyIDs = sin.readStringList(),
indexMetadatas = sin.readList { ManagedIndexMetaData.fromStreamInput(it) },
totalManagedIndices = sin.readInt(),
enabledState = sin.readMap() as Map<String, Boolean>
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeInt(totalManagedIndices)
out.writeMap(enabledState)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
indexNames.forEachIndexed { ind, name ->
builder.startObject(name)
builder.field(ManagedIndexSettings.POLICY_ID.key, indexPolicyIDs[ind])
indexMetadatas[ind]?.toXContent(builder, ToXContent.EMPTY_PARAMS)
builder.field("enabled", enabledState[name])
builder.endObject()
}
builder.field("total_managed_indices", totalManagedIndices)
return builder.endObject()
}
}
Loading

0 comments on commit 2f7d778

Please sign in to comment.