From 2f7d778a4447e7c64b3e57fac54e08f8ae3829b6 Mon Sep 17 00:00:00 2001 From: Bowen Lan <62091230+bowenlan-amzn@users.noreply.github.com> Date: Wed, 27 Jan 2021 15:10:58 -0800 Subject: [PATCH] Explain all and Get all API (#352) * transform explain to be able to get all use SearchParams fix sort order problem of cluster state add enabled field * differentiate explain and explain all response * add get all policy API * remove enable field from metadata * normalize action name to be similar with other ODFE plugins * totalPolicies to total_policies total_managed_indices bug fix: when query size = 0 still show total managed indices --- .../indexmanagement/IndexManagementPlugin.kt | 3 + .../IndexStateManagementHistory.kt | 2 +- .../ManagedIndexRunner.kt | 2 +- .../model/ManagedIndexMetaData.kt | 2 + .../indexstatemanagement/model/Policy.kt | 18 +- .../model/SearchParams.kt | 48 +++++ .../model/action/ActionConfig.kt | 1 + .../resthandler/RestExplainAction.kt | 30 ++- .../resthandler/RestGetPolicyAction.kt | 33 +++- .../forcemerge/AttemptCallForceMergeStep.kt | 2 +- .../TransportChangePolicyAction.kt | 1 + .../action/explain/ExplainAllResponse.kt | 71 +++++++ .../action/explain/ExplainRequest.kt | 17 +- .../action/explain/ExplainResponse.kt | 5 +- .../action/explain/TransportExplainAction.kt | 181 ++++++++++++++++-- .../action/getpolicy/GetPoliciesAction.kt | 25 +++ .../action/getpolicy/GetPoliciesRequest.kt | 48 +++++ .../action/getpolicy/GetPoliciesResponse.kt | 72 +++++++ .../action/getpolicy/GetPolicyAction.kt | 2 +- .../getpolicy/TransportGetPoliciesAction.kt | 109 +++++++++++ .../util/RestHandlerUtils.kt | 7 + .../RefreshSearchAnalyzerRequest.kt | 1 + .../IndexManagementIndicesIT.kt | 14 +- .../IndexStateManagementRestTestCase.kt | 16 +- .../action/ActionTimeoutIT.kt | 16 +- .../IndexStateManagementRestApiIT.kt | 89 +++++++-- .../resthandler/RestExplainActionIT.kt | 121 ++++++++---- .../RestRetryFailedManagedIndexActionIT.kt | 8 +- .../runner/ManagedIndexRunnerIT.kt | 8 +- .../transport/action/ActionTests.kt | 6 + .../action/explain/ExplainRequestTests.kt | 4 +- .../action/explain/ExplainResponseTests.kt | 34 ++++ .../getpolicy/GetPoliciesRequestTests.kt | 35 ++++ .../getpolicy/GetPoliciesResponseTests.kt | 37 ++++ .../getpolicy/GetPolicyResponseTests.kt | 2 +- 35 files changed, 929 insertions(+), 141 deletions(-) create mode 100644 src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/SearchParams.kt create mode 100644 src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainAllResponse.kt create mode 100644 src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesAction.kt create mode 100644 src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesRequest.kt create mode 100644 src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesResponse.kt create mode 100644 src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt create mode 100644 src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesRequestTests.kt create mode 100644 src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesResponseTests.kt diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementPlugin.kt index ae31e6055..837557fd6 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementPlugin.kt @@ -39,7 +39,9 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.deletepolicy.TransportDeletePolicyAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain.TransportExplainAction +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPoliciesAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPolicyAction +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.TransportGetPoliciesAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.TransportGetPolicyAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.indexpolicy.IndexPolicyAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.indexpolicy.TransportIndexPolicyAction @@ -291,6 +293,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act ActionPlugin.ActionHandler(ExplainAction.INSTANCE, TransportExplainAction::class.java), ActionPlugin.ActionHandler(DeletePolicyAction.INSTANCE, TransportDeletePolicyAction::class.java), ActionPlugin.ActionHandler(GetPolicyAction.INSTANCE, TransportGetPolicyAction::class.java), + ActionPlugin.ActionHandler(GetPoliciesAction.INSTANCE, TransportGetPoliciesAction::class.java), ActionPlugin.ActionHandler(DeleteRollupAction.INSTANCE, TransportDeleteRollupAction::class.java), ActionPlugin.ActionHandler(GetRollupAction.INSTANCE, TransportGetRollupAction::class.java), ActionPlugin.ActionHandler(GetRollupsAction.INSTANCE, TransportGetRollupsAction::class.java), diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt index 1101ce7a8..700cc233d 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt @@ -129,7 +129,7 @@ class IndexStateManagementHistory( return response.isRolledOver } - @Suppress("SpreadOperator", "NestedBlockDepth") + @Suppress("SpreadOperator", "NestedBlockDepth", "ComplexMethod") private fun deleteOldHistoryIndex() { val indexToDelete = mutableListOf() diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 626839cc2..7d9104d57 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -534,7 +534,7 @@ object ManagedIndexRunner : ScheduledJobRunner, * Initializes the change policy process where we will get the policy using the change policy's policyID, update the [ManagedIndexMetaData] * to reflect the new policy, and save the new policy to the [ManagedIndexConfig] while resetting the change policy to null */ - @Suppress("ReturnCount") + @Suppress("ReturnCount", "ComplexMethod") private suspend fun initChangePolicy( managedIndexConfig: ManagedIndexConfig, managedIndexMetaData: ManagedIndexMetaData, diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/ManagedIndexMetaData.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/ManagedIndexMetaData.kt index 934c3278e..98ff92994 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/ManagedIndexMetaData.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/ManagedIndexMetaData.kt @@ -50,6 +50,7 @@ data class ManagedIndexMetaData( val info: Map? ) : Writeable, ToXContentFragment { + @Suppress("ComplexMethod") fun toMap(): Map { val resultMap = mutableMapOf () resultMap[INDEX] = index @@ -116,6 +117,7 @@ data class ManagedIndexMetaData( } if (info != null) builder.field(INFO, info) + return builder } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/Policy.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/Policy.kt index 10fe89fc8..7e20203be 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/Policy.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/Policy.kt @@ -153,15 +153,15 @@ data class Policy( } return Policy( - id, - seqNo, - primaryTerm, - requireNotNull(description) { "$DESCRIPTION_FIELD is null" }, - schemaVersion, - lastUpdatedTime ?: Instant.now(), - errorNotification, - requireNotNull(defaultState) { "$DEFAULT_STATE_FIELD is null" }, - states.toList() + id = id, + seqNo = seqNo, + primaryTerm = primaryTerm, + description = requireNotNull(description) { "$DESCRIPTION_FIELD is null" }, + schemaVersion = schemaVersion, + lastUpdatedTime = lastUpdatedTime ?: Instant.now(), + errorNotification = errorNotification, + defaultState = requireNotNull(defaultState) { "$DEFAULT_STATE_FIELD is null" }, + states = states.toList() ) } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/SearchParams.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/SearchParams.kt new file mode 100644 index 000000000..d3ee27b89 --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/SearchParams.kt @@ -0,0 +1,48 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model + +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.common.io.stream.StreamOutput +import org.elasticsearch.common.io.stream.Writeable +import java.io.IOException + +data class SearchParams( + val size: Int, + val from: Int, + val sortField: String, + val sortOrder: String, + val queryString: String +) : Writeable { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + size = sin.readInt(), + from = sin.readInt(), + sortField = sin.readString(), + sortOrder = sin.readString(), + queryString = sin.readString() + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeInt(size) + out.writeInt(from) + out.writeString(sortField) + out.writeString(sortOrder) + out.writeString(queryString) + } +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/action/ActionConfig.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/action/ActionConfig.kt index 1fc118367..64ed6d300 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/action/ActionConfig.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/action/ActionConfig.kt @@ -87,6 +87,7 @@ abstract class ActionConfig( // TODO clean up for actionIndex @JvmStatic @Throws(IOException::class) + @Suppress("ComplexMethod") fun fromStreamInput(sin: StreamInput): ActionConfig { val type = sin.readEnum(ActionType::class.java) val actionIndex = sin.readInt() diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestExplainAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestExplainAction.kt index 03e2cbb42..fd0fbd46f 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestExplainAction.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestExplainAction.kt @@ -16,8 +16,15 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.ISM_BASE_URI +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.SearchParams import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainRequest +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_JOB_SORT_FIELD +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_PAGINATION_FROM +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_PAGINATION_SIZE +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_QUERY_STRING +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_SORT_ORDER +import org.apache.logging.log4j.LogManager import org.elasticsearch.action.support.master.MasterNodeRequest import org.elasticsearch.client.node.NodeClient import org.elasticsearch.common.Strings @@ -28,6 +35,8 @@ import org.elasticsearch.rest.RestRequest import org.elasticsearch.rest.RestRequest.Method.GET import org.elasticsearch.rest.action.RestToXContentListener +private val log = LogManager.getLogger(RestExplainAction::class.java) + class RestExplainAction : BaseRestHandler() { companion object { @@ -47,13 +56,22 @@ class RestExplainAction : BaseRestHandler() { @Suppress("SpreadOperator") // There is no way around dealing with java vararg without spread operator. override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - val indices: Array? = Strings.splitStringByCommaToArray(request.param("index")) - if (indices == null || indices.isEmpty()) { - throw IllegalArgumentException("Missing indices") - } + log.debug("${request.method()} ${request.path()}") + + val indices: Array = Strings.splitStringByCommaToArray(request.param("index")) - val explainRequest = ExplainRequest(indices.toList(), request.paramAsBoolean("local", false), - request.paramAsTime("master_timeout", MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT)) + val size = request.paramAsInt("size", DEFAULT_PAGINATION_SIZE) + val from = request.paramAsInt("from", DEFAULT_PAGINATION_FROM) + val sortField = request.param("sortField", DEFAULT_JOB_SORT_FIELD) + val sortOrder = request.param("sortOrder", DEFAULT_SORT_ORDER) + val queryString = request.param("queryString", DEFAULT_QUERY_STRING) + + val explainRequest = ExplainRequest( + indices.toList(), + request.paramAsBoolean("local", false), + request.paramAsTime("master_timeout", MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT), + SearchParams(size, from, sortField, sortOrder, queryString) + ) return RestChannelConsumer { channel -> client.execute(ExplainAction.INSTANCE, explainRequest, RestToXContentListener(channel)) diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestGetPolicyAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestGetPolicyAction.kt index a04575040..beda18505 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestGetPolicyAction.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestGetPolicyAction.kt @@ -16,8 +16,17 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.POLICY_BASE_URI +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.SearchParams +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPoliciesAction +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPoliciesRequest import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPolicyAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPolicyRequest +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_PAGINATION_FROM +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_PAGINATION_SIZE +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_POLICY_SORT_FIELD +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_QUERY_STRING +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.DEFAULT_SORT_ORDER +import org.apache.logging.log4j.LogManager import org.elasticsearch.client.node.NodeClient import org.elasticsearch.rest.BaseRestHandler import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer @@ -29,10 +38,13 @@ import org.elasticsearch.rest.action.RestActions import org.elasticsearch.rest.action.RestToXContentListener import org.elasticsearch.search.fetch.subphase.FetchSourceContext +private val log = LogManager.getLogger(RestGetPolicyAction::class.java) + class RestGetPolicyAction : BaseRestHandler() { override fun routes(): List { return listOf( + Route(GET, POLICY_BASE_URI), Route(GET, "$POLICY_BASE_URI/{policyID}"), Route(HEAD, "$POLICY_BASE_URI/{policyID}") ) @@ -43,20 +55,29 @@ class RestGetPolicyAction : BaseRestHandler() { } override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - val policyId = request.param("policyID") + log.debug("${request.method()} ${request.path()}") - if (policyId == null || policyId.isEmpty()) { - throw IllegalArgumentException("Missing policy ID") - } + val policyId = request.param("policyID") var fetchSrcContext: FetchSourceContext = FetchSourceContext.FETCH_SOURCE if (request.method() == HEAD) { fetchSrcContext = FetchSourceContext.DO_NOT_FETCH_SOURCE } - val getPolicyRequest = GetPolicyRequest(policyId, RestActions.parseVersion(request), fetchSrcContext) + + val size = request.paramAsInt("size", DEFAULT_PAGINATION_SIZE) + val from = request.paramAsInt("from", DEFAULT_PAGINATION_FROM) + val sortField = request.param("sortField", DEFAULT_POLICY_SORT_FIELD) + val sortOrder = request.param("sortOrder", DEFAULT_SORT_ORDER) + val queryString = request.param("queryString", DEFAULT_QUERY_STRING) return RestChannelConsumer { channel -> - client.execute(GetPolicyAction.INSTANCE, getPolicyRequest, RestToXContentListener(channel)) + if (policyId == null || policyId.isEmpty()) { + val getPoliciesRequest = GetPoliciesRequest(SearchParams(size, from, sortField, sortOrder, queryString)) + client.execute(GetPoliciesAction.INSTANCE, getPoliciesRequest, RestToXContentListener(channel)) + } else { + val getPolicyRequest = GetPolicyRequest(policyId, RestActions.parseVersion(request), fetchSrcContext) + client.execute(GetPolicyAction.INSTANCE, getPolicyRequest, RestToXContentListener(channel)) + } } } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt index 52622cee7..c8523dd87 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt @@ -50,7 +50,7 @@ class AttemptCallForceMergeStep( override fun isIdempotent() = false - @Suppress("TooGenericExceptionCaught") + @Suppress("TooGenericExceptionCaught", "ComplexMethod") override suspend fun execute(): AttemptCallForceMergeStep { try { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt index 5248f7f88..f83e3a88d 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt @@ -73,6 +73,7 @@ class TransportChangePolicyAction @Inject constructor( ChangePolicyHandler(client, listener, request).start() } + @Suppress("TooManyFunctions") inner class ChangePolicyHandler( private val client: NodeClient, private val actionListener: ActionListener, diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainAllResponse.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainAllResponse.kt new file mode 100644 index 000000000..22d922cdb --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainAllResponse.kt @@ -0,0 +1,71 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain + +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.common.io.stream.StreamOutput +import org.elasticsearch.common.xcontent.ToXContent +import org.elasticsearch.common.xcontent.ToXContentObject +import org.elasticsearch.common.xcontent.XContentBuilder +import java.io.IOException + +class ExplainAllResponse : ExplainResponse, ToXContentObject { + + val totalManagedIndices: Int + val enabledState: Map + + constructor( + indexNames: List, + indexPolicyIDs: List, + indexMetadatas: List, + totalManagedIndices: Int, + enabledState: Map + ) : super(indexNames, indexPolicyIDs, indexMetadatas) { + this.totalManagedIndices = totalManagedIndices + this.enabledState = enabledState + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + indexNames = sin.readStringList(), + indexPolicyIDs = sin.readStringList(), + indexMetadatas = sin.readList { ManagedIndexMetaData.fromStreamInput(it) }, + totalManagedIndices = sin.readInt(), + enabledState = sin.readMap() as Map + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeInt(totalManagedIndices) + out.writeMap(enabledState) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + indexNames.forEachIndexed { ind, name -> + builder.startObject(name) + builder.field(ManagedIndexSettings.POLICY_ID.key, indexPolicyIDs[ind]) + indexMetadatas[ind]?.toXContent(builder, ToXContent.EMPTY_PARAMS) + builder.field("enabled", enabledState[name]) + builder.endObject() + } + builder.field("total_managed_indices", totalManagedIndices) + return builder.endObject() + } +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainRequest.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainRequest.kt index 6796a926c..f0b582e12 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainRequest.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainRequest.kt @@ -15,9 +15,9 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.SearchParams import org.elasticsearch.action.ActionRequest import org.elasticsearch.action.ActionRequestValidationException -import org.elasticsearch.action.ValidateActions import org.elasticsearch.common.io.stream.StreamInput import org.elasticsearch.common.io.stream.StreamOutput import org.elasticsearch.common.unit.TimeValue @@ -28,30 +28,30 @@ class ExplainRequest : ActionRequest { val indices: List val local: Boolean val masterTimeout: TimeValue + val searchParams: SearchParams constructor( indices: List, local: Boolean, - masterTimeout: TimeValue + masterTimeout: TimeValue, + searchParams: SearchParams ) : super() { this.indices = indices this.local = local this.masterTimeout = masterTimeout + this.searchParams = searchParams } @Throws(IOException::class) constructor(sin: StreamInput) : this( indices = sin.readStringList(), local = sin.readBoolean(), - masterTimeout = sin.readTimeValue() + masterTimeout = sin.readTimeValue(), + searchParams = SearchParams(sin) ) override fun validate(): ActionRequestValidationException? { - var validationException: ActionRequestValidationException? = null - if (indices.isEmpty()) { - validationException = ValidateActions.addValidationError("Missing indices", validationException) - } - return validationException + return null } @Throws(IOException::class) @@ -59,5 +59,6 @@ class ExplainRequest : ActionRequest { out.writeStringCollection(indices) out.writeBoolean(local) out.writeTimeValue(masterTimeout) + searchParams.writeTo(out) } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt index e90cf6ee2..8544f4274 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponse.kt @@ -24,10 +24,10 @@ import org.elasticsearch.common.xcontent.ToXContent import org.elasticsearch.common.xcontent.ToXContentObject import org.elasticsearch.common.xcontent.XContentBuilder import java.io.IOException -import java.util.* -class ExplainResponse : ActionResponse, ToXContentObject { +open class ExplainResponse : ActionResponse, ToXContentObject { + // TODO refactor these lists usage to map val indexNames: List val indexPolicyIDs: List val indexMetadatas: List @@ -49,6 +49,7 @@ class ExplainResponse : ActionResponse, ToXContentObject { indexMetadatas = sin.readList { ManagedIndexMetaData.fromStreamInput(it) } ) + @Throws(IOException::class) override fun writeTo(out: StreamOutput) { out.writeStringCollection(indexNames) out.writeStringCollection(indexPolicyIDs) diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt index 7906182a8..6edb680ff 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt @@ -15,19 +15,32 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain -import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyID +import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator.Companion.MAX_HITS import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData +import org.apache.logging.log4j.LogManager +import org.elasticsearch.index.IndexNotFoundException import org.elasticsearch.action.ActionListener import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse +import org.elasticsearch.action.search.SearchRequest +import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.action.support.ActionFilters import org.elasticsearch.action.support.HandledTransportAction import org.elasticsearch.action.support.IndicesOptions import org.elasticsearch.client.node.NodeClient import org.elasticsearch.common.inject.Inject +import org.elasticsearch.index.query.Operator +import org.elasticsearch.index.query.QueryBuilders +import org.elasticsearch.search.builder.SearchSourceBuilder +import org.elasticsearch.search.fetch.subphase.FetchSourceContext.FETCH_SOURCE +import org.elasticsearch.search.sort.SortBuilders +import org.elasticsearch.search.sort.SortOrder import org.elasticsearch.tasks.Task import org.elasticsearch.transport.TransportService +private val log = LogManager.getLogger(TransportExplainAction::class.java) + class TransportExplainAction @Inject constructor( val client: NodeClient, transportService: TransportService, @@ -39,18 +52,137 @@ class TransportExplainAction @Inject constructor( ExplainHandler(client, listener, request).start() } + /** + * first search config index to find out managed indices + * then retrieve metadata of these managed indices + * special case: when user explicitly query for an un-managed index + * return this index with its policy id shown 'null' meaning it's not managed + */ inner class ExplainHandler( private val client: NodeClient, private val actionListener: ActionListener, private val request: ExplainRequest ) { + private val indices: List = request.indices + private val explainAll: Boolean = indices.isEmpty() + private val wildcard: Boolean = indices.any { it.contains("*") } + + // map of index to index metadata got from config index job + private val managedIndicesMetaDataMap: MutableMap> = mutableMapOf() + private val managedIndices: MutableList = mutableListOf() + + private val indexNames: MutableList = mutableListOf() + private val enabledState: MutableMap = mutableMapOf() + private var totalManagedIndices = 0 + @Suppress("SpreadOperator") fun start() { + val params = request.searchParams + + val sortBuilder = SortBuilders + .fieldSort(params.sortField) + .order(SortOrder.fromString(params.sortOrder)) + + val queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders + .queryStringQuery(params.queryString) + .defaultField("managed_index.name") + .defaultOperator(Operator.AND)) + + var searchSourceBuilder = SearchSourceBuilder() + .from(params.from) + .fetchSource(FETCH_SOURCE) + .seqNoAndPrimaryTerm(true) + .version(true) + .sort(sortBuilder) + + if (!explainAll) { + searchSourceBuilder = searchSourceBuilder.size(MAX_HITS) + if (wildcard) { // explain/index* + indices.forEach { + if (it.contains("*")) { + queryBuilder.should(QueryBuilders.wildcardQuery("managed_index.index", it)) + } else { + queryBuilder.should(QueryBuilders.termsQuery("managed_index.index", it)) + } + } + } else { // explain/{index} + queryBuilder.filter(QueryBuilders.termsQuery("managed_index.index", indices)) + } + } else { // explain all + searchSourceBuilder = searchSourceBuilder.size(params.size) + queryBuilder.filter(QueryBuilders.existsQuery("managed_index")) + } + + searchSourceBuilder = searchSourceBuilder.query(queryBuilder) + + val searchRequest = SearchRequest() + .indices(INDEX_MANAGEMENT_INDEX) + .source(searchSourceBuilder) + + client.search(searchRequest, object : ActionListener { + override fun onResponse(response: SearchResponse) { + val totalHits = response.hits.totalHits + if (totalHits != null) { + totalManagedIndices = totalHits.value.toInt() + } + + response.hits.hits.map { + val hitMap = it.sourceAsMap["managed_index"] as Map + val managedIndex = hitMap["index"] as String + managedIndices.add(managedIndex) + enabledState[managedIndex] = hitMap["enabled"] as Boolean + managedIndicesMetaDataMap[managedIndex] = mapOf( + "index" to hitMap["index"] as String?, + "index_uuid" to hitMap["index_uuid"] as String?, + "policy_id" to hitMap["policy_id"] as String?, + "enabled" to hitMap["enabled"]?.toString() + ) + } + + // explain all only return managed indices + if (explainAll) { + if (managedIndices.size == 0) { + // edge case: if specify query param pagination size to be 0 + // we still show total managed indices + emptyResponse(totalManagedIndices) + return + } else { + indexNames.addAll(managedIndices) + getMetadata(managedIndices) + return + } + } + + // explain/{index} return results for all indices + indexNames.addAll(indices) + getMetadata(indices) + } + + override fun onFailure(t: Exception) { + if (t is IndexNotFoundException) { + // config index hasn't been initialized + // show all requested indices not managed + if (indices.isNotEmpty()) { + indexNames.addAll(indices) + getMetadata(indices) + return + } + emptyResponse() + return + } + actionListener.onFailure(t) + } + }) + } + + @Suppress("SpreadOperator") + fun getMetadata(indices: List) { val clusterStateRequest = ClusterStateRequest() val strictExpandIndicesOptions = IndicesOptions.strictExpand() clusterStateRequest.clear() - .indices(*request.indices.toTypedArray()) + .indices(*indices.toTypedArray()) .metadata(true) .local(request.local) .masterNodeTimeout(request.masterTimeout) @@ -58,7 +190,7 @@ class TransportExplainAction @Inject constructor( client.admin().cluster().state(clusterStateRequest, object : ActionListener { override fun onResponse(response: ClusterStateResponse) { - processResponse(response) + onClusterStateResponse(response) } override fun onFailure(t: Exception) { @@ -67,27 +199,52 @@ class TransportExplainAction @Inject constructor( }) } - fun processResponse(clusterStateResponse: ClusterStateResponse) { + fun onClusterStateResponse(clusterStateResponse: ClusterStateResponse) { val state = clusterStateResponse.state - val indexNames = mutableListOf() val indexPolicyIDs = mutableListOf() val indexMetadatas = mutableListOf() - for (indexMetadataEntry in state.metadata.indices) { - indexNames.add(indexMetadataEntry.key) + if (wildcard) { + indexNames.clear() // clear wildcard (index*) from indexNames + state.metadata.indices.forEach { indexNames.add(it.key) } + } + + // cluster state response will not resisting the sort order + // so use the order from previous search result saved in indexNames + for (indexName in indexNames) { + val indexMetadata = state.metadata.indices[indexName] - val indexMetadata = indexMetadataEntry.value - indexPolicyIDs.add(indexMetadata.getPolicyID()) + var managedIndexMetadataMap = managedIndicesMetaDataMap[indexName] + indexPolicyIDs.add(managedIndexMetadataMap?.get("policy_id")) // use policyID from metadata - val managedIndexMetaDataMap = indexMetadata.getCustomData(ManagedIndexMetaData.MANAGED_INDEX_METADATA) var managedIndexMetadata: ManagedIndexMetaData? = null - if (managedIndexMetaDataMap != null) { - managedIndexMetadata = ManagedIndexMetaData.fromMap(managedIndexMetaDataMap) + val clusterStateMetadata = indexMetadata.getCustomData(ManagedIndexMetaData.MANAGED_INDEX_METADATA) + if (managedIndexMetadataMap != null) { + if (clusterStateMetadata != null) { // if has metadata saved, use that + managedIndexMetadataMap = clusterStateMetadata + } + if (managedIndexMetadataMap.isNotEmpty()) { + managedIndexMetadata = ManagedIndexMetaData.fromMap(managedIndexMetadataMap) + } } indexMetadatas.add(managedIndexMetadata) } + managedIndicesMetaDataMap.clear() + + if (explainAll) { + actionListener.onResponse(ExplainAllResponse(indexNames, indexPolicyIDs, indexMetadatas, totalManagedIndices, enabledState)) + return + } actionListener.onResponse(ExplainResponse(indexNames, indexPolicyIDs, indexMetadatas)) } + + fun emptyResponse(size: Int = 0) { + if (explainAll) { + actionListener.onResponse(ExplainAllResponse(emptyList(), emptyList(), emptyList(), size, emptyMap())) + return + } + actionListener.onResponse(ExplainResponse(emptyList(), emptyList(), emptyList())) + } } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesAction.kt new file mode 100644 index 000000000..ec6422f1e --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesAction.kt @@ -0,0 +1,25 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy + +import org.elasticsearch.action.ActionType + +class GetPoliciesAction private constructor() : ActionType(NAME, ::GetPoliciesResponse) { + companion object { + val INSTANCE = GetPoliciesAction() + val NAME = "cluster:admin/opendistro/ism/policy/search" + } +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesRequest.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesRequest.kt new file mode 100644 index 000000000..0841bd30c --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesRequest.kt @@ -0,0 +1,48 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy + +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.SearchParams +import org.elasticsearch.action.ActionRequest +import org.elasticsearch.action.ActionRequestValidationException +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.common.io.stream.StreamOutput +import java.io.IOException + +class GetPoliciesRequest : ActionRequest { + + val searchParams: SearchParams + + constructor( + searchParams: SearchParams + ) : super() { + this.searchParams = searchParams + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + searchParams = SearchParams(sin) + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + searchParams.writeTo(out) + } +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesResponse.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesResponse.kt new file mode 100644 index 000000000..82be00217 --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesResponse.kt @@ -0,0 +1,72 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy + +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE +import com.amazon.opendistroforelasticsearch.indexmanagement.util._ID +import com.amazon.opendistroforelasticsearch.indexmanagement.util._PRIMARY_TERM +import com.amazon.opendistroforelasticsearch.indexmanagement.util._SEQ_NO +import org.elasticsearch.action.ActionResponse +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.common.io.stream.StreamOutput +import org.elasticsearch.common.xcontent.ToXContent +import org.elasticsearch.common.xcontent.ToXContentObject +import org.elasticsearch.common.xcontent.XContentBuilder +import java.io.IOException + +class GetPoliciesResponse : ActionResponse, ToXContentObject { + + val policies: List + val totalPolicies: Int + + constructor( + policies: List, + totalPolicies: Int + ) : super() { + this.policies = policies + this.totalPolicies = totalPolicies + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + policies = sin.readList(::Policy), + totalPolicies = sin.readInt() + ) + + override fun writeTo(out: StreamOutput) { + out.writeCollection(policies) + out.writeInt(totalPolicies) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .startArray("policies") + .apply { + for (policy in policies) { + this.startObject() + .field(_ID, policy.id) + .field(_SEQ_NO, policy.seqNo) + .field(_PRIMARY_TERM, policy.primaryTerm) + .field(Policy.POLICY_TYPE, policy, XCONTENT_WITHOUT_TYPE) + .endObject() + } + } + .endArray() + .field("total_policies", totalPolicies) + .endObject() + } +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPolicyAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPolicyAction.kt index fb42a9932..1340d6cbc 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPolicyAction.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPolicyAction.kt @@ -20,6 +20,6 @@ import org.elasticsearch.action.ActionType class GetPolicyAction private constructor() : ActionType(NAME, ::GetPolicyResponse) { companion object { val INSTANCE = GetPolicyAction() - val NAME = "cluster:admin/opendistro/ism/policy/read" + val NAME = "cluster:admin/opendistro/ism/policy/get" } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt new file mode 100644 index 000000000..d42a1e2df --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt @@ -0,0 +1,109 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy + +import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX +import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.parseWithType +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy +import org.apache.logging.log4j.LogManager +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.search.SearchRequest +import org.elasticsearch.action.search.SearchResponse +import org.elasticsearch.action.support.ActionFilters +import org.elasticsearch.action.support.HandledTransportAction +import org.elasticsearch.client.Client +import org.elasticsearch.common.inject.Inject +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler +import org.elasticsearch.common.xcontent.NamedXContentRegistry +import org.elasticsearch.common.xcontent.XContentFactory +import org.elasticsearch.common.xcontent.XContentType +import org.elasticsearch.index.IndexNotFoundException +import org.elasticsearch.index.query.Operator +import org.elasticsearch.index.query.QueryBuilders +import org.elasticsearch.search.builder.SearchSourceBuilder +import org.elasticsearch.search.sort.SortBuilders +import org.elasticsearch.search.sort.SortOrder +import org.elasticsearch.tasks.Task +import org.elasticsearch.transport.TransportService + +private val log = LogManager.getLogger(TransportGetPoliciesAction::class.java) + +class TransportGetPoliciesAction @Inject constructor( + transportService: TransportService, + val client: Client, + actionFilters: ActionFilters, + val xContentRegistry: NamedXContentRegistry +) : HandledTransportAction( + GetPoliciesAction.NAME, transportService, actionFilters, ::GetPoliciesRequest +) { + + override fun doExecute( + task: Task, + getPoliciesRequest: GetPoliciesRequest, + actionListener: ActionListener + ) { + val params = getPoliciesRequest.searchParams + + val sortBuilder = SortBuilders + .fieldSort(params.sortField) + .order(SortOrder.fromString(params.sortOrder)) + + val queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.existsQuery("policy")) + + queryBuilder.must(QueryBuilders + .queryStringQuery(params.queryString) + .defaultOperator(Operator.AND) + .field("policy.policy_id")) + + val searchSourceBuilder = SearchSourceBuilder() + .query(queryBuilder) + .sort(sortBuilder) + .from(params.from) + .size(params.size) + .seqNoAndPrimaryTerm(true) + + val searchRequest = SearchRequest() + .source(searchSourceBuilder) + .indices(INDEX_MANAGEMENT_INDEX) + + client.search(searchRequest, object : ActionListener { + override fun onResponse(response: SearchResponse) { + val totalPolicies = response.hits.totalHits?.value ?: 0 + val policies = response.hits.hits.map { + val id = it.id + val seqNo = it.seqNo + val primaryTerm = it.primaryTerm + val xcp = XContentFactory.xContent(XContentType.JSON) + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, it.sourceAsString) + xcp.parseWithType(id, seqNo, primaryTerm, Policy.Companion::parse) + .copy(id = id, seqNo = seqNo, primaryTerm = primaryTerm) + } + + actionListener.onResponse(GetPoliciesResponse(policies, totalPolicies.toInt())) + } + + override fun onFailure(t: Exception) { + if (t is IndexNotFoundException) { + // config index hasn't been initialized, catch this here and show empty result on Kibana + actionListener.onResponse(GetPoliciesResponse(emptyList(), 0)) + return + } + actionListener.onFailure(t) + } + }) + } +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt index 32365cb8d..53791da25 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt @@ -35,6 +35,13 @@ const val FAILURES = "failures" const val FAILED_INDICES = "failed_indices" const val UPDATED_INDICES = "updated_indices" +const val DEFAULT_PAGINATION_SIZE = 20 +const val DEFAULT_PAGINATION_FROM = 0 +const val DEFAULT_JOB_SORT_FIELD = "managed_index.index" +const val DEFAULT_POLICY_SORT_FIELD = "policy.policy_id.keyword" +const val DEFAULT_SORT_ORDER = "asc" +const val DEFAULT_QUERY_STRING = "*" + fun buildInvalidIndexResponse(builder: XContentBuilder, failedIndices: List) { if (failedIndices.isNotEmpty()) { builder.field(FAILURES, true) diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerRequest.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerRequest.kt index 19a0d57e9..fbfa05352 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerRequest.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerRequest.kt @@ -20,6 +20,7 @@ import org.elasticsearch.common.io.stream.StreamInput import java.io.IOException class RefreshSearchAnalyzerRequest : BroadcastRequest { + @Suppress("SpreadOperator") constructor(vararg indices: String) : super(*indices) @Throws(IOException::class) diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementIndicesIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementIndicesIT.kt index d48fac42a..498c6344f 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementIndicesIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementIndicesIT.kt @@ -23,6 +23,8 @@ import java.util.Locale class IndexManagementIndicesIT : IndexStateManagementRestTestCase() { private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT) + private val configSchemaVersion = 6 + private val historySchemaVersion = 3 /* * If this test fails it means you changed the config mappings @@ -60,7 +62,7 @@ class IndexManagementIndicesIT : IndexStateManagementRestTestCase() { assertIndexDoesNotExist(INDEX_MANAGEMENT_INDEX) val mapping = indexManagementMappings.trim().trimStart('{').trimEnd('}') - .replace("\"schema_version\": 6", "\"schema_version\": 0") + .replace("\"schema_version\": $configSchemaVersion", "\"schema_version\": 0") createIndex(INDEX_MANAGEMENT_INDEX, Settings.builder().put("index.hidden", true).build(), mapping) assertIndexExists(INDEX_MANAGEMENT_INDEX) @@ -71,14 +73,14 @@ class IndexManagementIndicesIT : IndexStateManagementRestTestCase() { client().makeRequest("PUT", "$POLICY_BASE_URI/$policyId", emptyMap(), policy.toHttpEntity()) assertIndexExists(INDEX_MANAGEMENT_INDEX) - verifyIndexSchemaVersion(INDEX_MANAGEMENT_INDEX, 6) + verifyIndexSchemaVersion(INDEX_MANAGEMENT_INDEX, configSchemaVersion) } fun `test update management index history mappings with new schema version`() { assertIndexDoesNotExist("$HISTORY_WRITE_INDEX_ALIAS?allow_no_indices=false") val mapping = indexStateManagementHistoryMappings.trim().trimStart('{').trimEnd('}') - .replace("\"schema_version\": 3", "\"schema_version\": 0") + .replace("\"schema_version\": $historySchemaVersion", "\"schema_version\": 0") val aliases = "\"$HISTORY_WRITE_INDEX_ALIAS\": { \"is_write_index\": true }" createIndex("$HISTORY_INDEX_BASE-1", Settings.builder().put("index.hidden", true).build(), mapping, aliases) @@ -99,7 +101,7 @@ class IndexManagementIndicesIT : IndexStateManagementRestTestCase() { waitFor { assertIndexExists(HISTORY_WRITE_INDEX_ALIAS) - verifyIndexSchemaVersion(HISTORY_WRITE_INDEX_ALIAS, 3) + verifyIndexSchemaVersion(HISTORY_WRITE_INDEX_ALIAS, historySchemaVersion) } } @@ -115,7 +117,7 @@ class IndexManagementIndicesIT : IndexStateManagementRestTestCase() { assertEquals("Policy id does not match", policy.id, managedIndexConfig.policyID) val mapping = "{" + indexManagementMappings.trimStart('{').trimEnd('}') - .replace("\"schema_version\": 6", "\"schema_version\": 0") + .replace("\"schema_version\": $configSchemaVersion", "\"schema_version\": 0") val entity = StringEntity(mapping, ContentType.APPLICATION_JSON) client().makeRequest(RestRequest.Method.PUT.toString(), @@ -129,7 +131,7 @@ class IndexManagementIndicesIT : IndexStateManagementRestTestCase() { RestRequest.Method.POST.toString(), "${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$index", emptyMap(), changePolicy.toHttpEntity()) - verifyIndexSchemaVersion(INDEX_MANAGEMENT_INDEX, 6) + verifyIndexSchemaVersion(INDEX_MANAGEMENT_INDEX, configSchemaVersion) assertAffectedIndicesResponseIsEqual(mapOf(FAILURES to false, FAILED_INDICES to emptyList(), UPDATED_INDICES to 1), response.asMap()) diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt index 1954416fd..81000dbf7 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -50,6 +50,7 @@ import org.apache.http.HttpHeaders import org.apache.http.entity.ContentType.APPLICATION_JSON import org.apache.http.entity.StringEntity import org.apache.http.message.BasicHeader +import org.apache.logging.log4j.LogManager import org.elasticsearch.ElasticsearchParseException import org.elasticsearch.action.get.GetResponse import org.elasticsearch.action.search.SearchResponse @@ -77,6 +78,8 @@ import java.util.Locale abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() { + val log = LogManager.getLogger(IndexStateManagementRestTestCase::class.java) + protected fun createPolicy( policy: Policy, policyId: String = ESTestCase.randomAlphaOfLength(10), @@ -433,8 +436,10 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() protected fun getFlatSettings(indexName: String) = (getIndexSettings(indexName) as Map>>)[indexName]!!["settings"] as Map - protected fun getExplainMap(indexName: String): Map { - val response = client().makeRequest(RestRequest.Method.GET.toString(), "${RestExplainAction.EXPLAIN_BASE_URI}/$indexName") + protected fun getExplainMap(indexName: String?): Map { + var endpoint = RestExplainAction.EXPLAIN_BASE_URI + if (indexName != null) endpoint += "/$indexName" + val response = client().makeRequest(RestRequest.Method.GET.toString(), endpoint) assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) return response.asMap() } @@ -476,16 +481,19 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() val response = client().makeRequest(RestRequest.Method.GET.toString(), "${RestExplainAction.EXPLAIN_BASE_URI}/$indexName") assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) - lateinit var metadata: ManagedIndexMetaData val xcp = createParser(XContentType.JSON.xContent(), response.entity.content) ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) while (xcp.nextToken() != Token.END_OBJECT) { - xcp.currentName() + val cn = xcp.currentName() xcp.nextToken() + if (cn == "total_managed_indices") continue metadata = ManagedIndexMetaData.parse(xcp) } + + // make sure metadata is initialised + assertTrue(metadata.transitionTo != null || metadata.stateMetaData != null || metadata.info != null || metadata.policyCompleted != null) return metadata } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/action/ActionTimeoutIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/action/ActionTimeoutIT.kt index d5fa9241e..13ce85b89 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/action/ActionTimeoutIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/action/ActionTimeoutIT.kt @@ -47,13 +47,7 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() { // First execution. We need to initialize the policy. updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { - assertPredicatesOnMetaData( - listOf(indexName to listOf(ManagedIndexMetaData.POLICY_ID to policyID::equals)), - getExplainMap(indexName), - strict = false - ) - } + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } // the second execution we move into rollover action, we won't hit the timeout as this is the execution that sets the startTime updateManagedIndexConfigStartTime(managedIndexConfig) @@ -97,13 +91,7 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() { // First execution. We need to initialize the policy. updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { - assertPredicatesOnMetaData( - listOf(indexName to listOf(ManagedIndexMetaData.POLICY_ID to policyID::equals)), - getExplainMap(indexName), - strict = false - ) - } + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } // the second execution we move into open action, we won't hit the timeout as this is the execution that sets the startTime updateManagedIndexConfigStartTime(managedIndexConfig) diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt index 652716c21..a1faadf59 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt @@ -18,13 +18,13 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanageme import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.POLICY_BASE_URI import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase -import com.amazon.opendistroforelasticsearch.indexmanagement.makeRequest import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action.ActionConfig import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.randomPolicy import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.randomReadOnlyActionConfig import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.randomState import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings +import com.amazon.opendistroforelasticsearch.indexmanagement.makeRequest import com.amazon.opendistroforelasticsearch.indexmanagement.util._ID import com.amazon.opendistroforelasticsearch.indexmanagement.util._PRIMARY_TERM import com.amazon.opendistroforelasticsearch.indexmanagement.util._SEQ_NO @@ -34,6 +34,7 @@ import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.client.ResponseException import org.elasticsearch.common.xcontent.XContentType import org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent +import org.elasticsearch.rest.RestRequest import org.elasticsearch.rest.RestStatus import org.elasticsearch.test.ESTestCase import org.elasticsearch.test.junit.annotations.TestLogging @@ -71,7 +72,8 @@ class IndexStateManagementRestApiIT : IndexStateManagementRestTestCase() { fun `test creating a policy`() { val policy = randomPolicy() val policyId = ESTestCase.randomAlphaOfLength(10) - val createResponse = client().makeRequest("PUT", "$POLICY_BASE_URI/$policyId", emptyMap(), policy.toHttpEntity()) + val createResponse = + client().makeRequest("PUT", "$POLICY_BASE_URI/$policyId", emptyMap(), policy.toHttpEntity()) assertEquals("Create policy failed", RestStatus.CREATED, createResponse.restStatus()) val responseBody = createResponse.asMap() @@ -124,10 +126,15 @@ class IndexStateManagementRestApiIT : IndexStateManagementRestTestCase() { // createRandomPolicy currently does not create a random list of actions so it won't accidentally create one with read_only val policy = createRandomPolicy() // update the policy to have read_only action which is not allowed - val updatedPolicy = policy.copy(defaultState = "some_state", states = listOf(randomState(name = "some_state", actions = listOf(randomReadOnlyActionConfig())))) - client().makeRequest("PUT", + val updatedPolicy = policy.copy( + defaultState = "some_state", + states = listOf(randomState(name = "some_state", actions = listOf(randomReadOnlyActionConfig()))) + ) + client().makeRequest( + "PUT", "$POLICY_BASE_URI/${updatedPolicy.id}?refresh=true&if_seq_no=${updatedPolicy.seqNo}&if_primary_term=${updatedPolicy.primaryTerm}", - emptyMap(), updatedPolicy.toHttpEntity()) + emptyMap(), updatedPolicy.toHttpEntity() + ) fail("Expected 403 Method FORBIDDEN response") } catch (e: ResponseException) { assertEquals("Unexpected status", RestStatus.FORBIDDEN, e.response.restStatus()) @@ -150,11 +157,13 @@ class IndexStateManagementRestApiIT : IndexStateManagementRestTestCase() { createRandomPolicy() val response = client().makeRequest("GET", "/$INDEX_MANAGEMENT_INDEX/_mapping") - val parserMap = createParser(XContentType.JSON.xContent(), response.entity.content).map() as Map> + val parserMap = + createParser(XContentType.JSON.xContent(), response.entity.content).map() as Map> val mappingsMap = parserMap[INDEX_MANAGEMENT_INDEX]!!["mappings"] as Map val expected = createParser( - XContentType.JSON.xContent(), - javaClass.classLoader.getResource("mappings/opendistro-ism-config.json").readText()) + XContentType.JSON.xContent(), + javaClass.classLoader.getResource("mappings/opendistro-ism-config.json").readText() + ) val expectedMap = expected.map() assertEquals("Mappings are different", expectedMap, mappingsMap) @@ -165,9 +174,11 @@ class IndexStateManagementRestApiIT : IndexStateManagementRestTestCase() { val policy = createRandomPolicy() try { - client().makeRequest("PUT", - "$POLICY_BASE_URI/${policy.id}?refresh=true&if_seq_no=10251989&if_primary_term=2342", - emptyMap(), policy.toHttpEntity()) + client().makeRequest( + "PUT", + "$POLICY_BASE_URI/${policy.id}?refresh=true&if_seq_no=10251989&if_primary_term=2342", + emptyMap(), policy.toHttpEntity() + ) fail("expected 409 ResponseException") } catch (e: ResponseException) { assertEquals(RestStatus.CONFLICT, e.response.restStatus()) @@ -177,9 +188,11 @@ class IndexStateManagementRestApiIT : IndexStateManagementRestTestCase() { @Throws(Exception::class) fun `test update policy with correct seq_no and primary_term`() { val policy = createRandomPolicy() - val updateResponse = client().makeRequest("PUT", - "$POLICY_BASE_URI/${policy.id}?refresh=true&if_seq_no=${policy.seqNo}&if_primary_term=${policy.primaryTerm}", - emptyMap(), policy.toHttpEntity()) + val updateResponse = client().makeRequest( + "PUT", + "$POLICY_BASE_URI/${policy.id}?refresh=true&if_seq_no=${policy.seqNo}&if_primary_term=${policy.primaryTerm}", + emptyMap(), policy.toHttpEntity() + ) assertEquals("Update policy failed", RestStatus.OK, updateResponse.restStatus()) val responseBody = updateResponse.asMap() @@ -260,10 +273,54 @@ class IndexStateManagementRestApiIT : IndexStateManagementRestTestCase() { } } """.trimIndent() - val response = client().makeRequest("POST", "$INDEX_MANAGEMENT_INDEX/_search", emptyMap(), - StringEntity(request, APPLICATION_JSON)) + val response = client().makeRequest( + "POST", "$INDEX_MANAGEMENT_INDEX/_search", emptyMap(), + StringEntity(request, APPLICATION_JSON) + ) assertEquals("Request failed", RestStatus.OK, response.restStatus()) val searchResponse = SearchResponse.fromXContent(createParser(jsonXContent, response.entity.content)) assertTrue("Did not find policy using fuzzy search", searchResponse.hits.hits.size == 1) } + + fun `test get policies before ism init`() { + val actualResponse = client().makeRequest(RestRequest.Method.GET.toString(), POLICY_BASE_URI).asMap() + val expectedResponse = mapOf( + "policies" to emptyList(), + "total_policies" to 0 + ) + assertEquals(expectedResponse, actualResponse) + } + + fun `test get policies with actual policy`() { + val policy = createRandomPolicy() + + val response = client().makeRequest(RestRequest.Method.GET.toString(), POLICY_BASE_URI) + + val actualMessage = response.asMap() + val expectedMessage = mapOf( + "total_policies" to 1, + "policies" to listOf(mapOf( + _SEQ_NO to policy.seqNo, + _ID to policy.id, + _PRIMARY_TERM to policy.primaryTerm, + Policy.POLICY_TYPE to mapOf( + "schema_version" to policy.schemaVersion, + "policy_id" to policy.id, + "last_updated_time" to policy.lastUpdatedTime.toEpochMilli(), + "default_state" to policy.defaultState, + "description" to policy.description, + "error_notification" to policy.errorNotification, + "states" to policy.states.map { + mapOf( + "name" to it.name, + "transitions" to it.transitions, + "actions" to it.actions + ) + } + ) + )) + ) + + assertEquals(expectedMessage.toString(), actualMessage.toString()) + } } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt index e89beced9..62e4b0f84 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt @@ -16,15 +16,11 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase -import com.amazon.opendistroforelasticsearch.indexmanagement.makeRequest import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.PolicyRetryInfoMetaData import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StateMetaData import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings import com.amazon.opendistroforelasticsearch.indexmanagement.waitFor -import org.elasticsearch.client.ResponseException -import org.elasticsearch.rest.RestRequest -import org.elasticsearch.rest.RestStatus import java.time.Instant import java.util.Locale @@ -32,27 +28,6 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT) - fun `test missing indices`() { - try { - client().makeRequest(RestRequest.Method.GET.toString(), RestExplainAction.EXPLAIN_BASE_URI) - fail("Expected a failure") - } catch (e: ResponseException) { - assertEquals("Unexpected RestStatus", RestStatus.BAD_REQUEST, e.response.restStatus()) - val actualMessage = e.response.asMap() - val expectedErrorMessage = mapOf( - "error" to mapOf( - "root_cause" to listOf>( - mapOf("type" to "illegal_argument_exception", "reason" to "Missing indices") - ), - "type" to "illegal_argument_exception", - "reason" to "Missing indices" - ), - "status" to 400 - ) - assertEquals(expectedErrorMessage, actualMessage) - } - } - fun `test single index`() { val indexName = "${testIndexName}_movies" createIndex(indexName, null) @@ -64,25 +39,90 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { assertResponseMap(expected, getExplainMap(indexName)) } + fun `test single index explain all`() { + val indexName = "${testIndexName}_movies" + createIndex(indexName, null) + val expected = mapOf( + "total_managed_indices" to 0 + ) + assertResponseMap(expected, getExplainMap(null)) + } + + fun `test two indices, one managed one not managed`() { + // explicitly asks for un-managed index, will return policy_id as null + val indexName1 = "${testIndexName}_managed" + val indexName2 = "${testIndexName}_not_managed" + val policy = createRandomPolicy() + createIndex(indexName1, policy.id) + createIndex(indexName2, null) + + val expected = mapOf( + indexName1 to mapOf( + ManagedIndexSettings.POLICY_ID.key to policy.id, + "index" to indexName1, + "index_uuid" to getUuid(indexName1), + "policy_id" to policy.id + ), + indexName2 to mapOf( + ManagedIndexSettings.POLICY_ID.key to null + ) + ) + waitFor { + assertResponseMap(expected, getExplainMap("$indexName1,$indexName2")) + } + } + + fun `test two indices, one managed one not managed explain all`() { + // explain all returns only managed indices + val indexName1 = "${testIndexName}_managed" + val indexName2 = "${testIndexName}_not_managed" + val policy = createRandomPolicy() + createIndex(indexName1, policy.id) + createIndex(indexName2, null) + + val expected = mapOf( + indexName1 to mapOf( + ManagedIndexSettings.POLICY_ID.key to policy.id, + "index" to indexName1, + "index_uuid" to getUuid(indexName1), + "policy_id" to policy.id, + "enabled" to true + ), + "total_managed_indices" to 1 + ) + waitFor { + assertResponseMap(expected, getExplainMap(null)) + } + } + fun `test index pattern`() { - val indexName1 = "${testIndexName}_video" + val indexName1 = "${testIndexName}_pattern" val indexName2 = "${indexName1}_2" val indexName3 = "${indexName1}_3" - createIndex(indexName1, null) - createIndex(indexName2, null) + val policy = createRandomPolicy() + createIndex(indexName1, policyID = policy.id) + createIndex(indexName2, policyID = policy.id) createIndex(indexName3, null) val expected = mapOf( - indexName1 to mapOf( - ManagedIndexSettings.POLICY_ID.key to null + indexName1 to mapOf( + ManagedIndexSettings.POLICY_ID.key to policy.id, + "index" to indexName1, + "index_uuid" to getUuid(indexName1), + "policy_id" to policy.id ), - indexName2 to mapOf( - ManagedIndexSettings.POLICY_ID.key to null + indexName2 to mapOf( + ManagedIndexSettings.POLICY_ID.key to policy.id, + "index" to indexName2, + "index_uuid" to getUuid(indexName2), + "policy_id" to policy.id ), - indexName3 to mapOf( + indexName3 to mapOf( ManagedIndexSettings.POLICY_ID.key to null ) ) - assertResponseMap(expected, getExplainMap("$indexName1*")) + waitFor { + assertResponseMap(expected, getExplainMap("$indexName1*")) + } } fun `test attached policy`() { @@ -143,15 +183,20 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { } @Suppress("UNCHECKED_CAST") // Do assertion of the response map here so we don't have many places to do suppression. - private fun assertResponseMap(expected: Map>, actual: Map) { - actual as Map> + private fun assertResponseMap(expected: Map, actual: Map) { assertEquals("Explain Map does not match", expected.size, actual.size) for (metaDataEntry in expected) { - assertMetaDataEntries(metaDataEntry.value, actual[metaDataEntry.key]!!) + if (metaDataEntry.key == "total_managed_indices") { + assertEquals(metaDataEntry.value, actual[metaDataEntry.key]) + continue + } + val value = metaDataEntry.value as Map + actual as Map> + assertMetaDataEntries(value, actual[metaDataEntry.key]!!) } } - private fun assertMetaDataEntries(expected: Map, actual: Map) { + private fun assertMetaDataEntries(expected: Map, actual: Map) { assertEquals("MetaDataSize are not the same", expected.size, actual.size) for (entry in expected) { assertEquals("Expected and actual values does not match", entry.value, actual[entry.key]) diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt index c90728b9d..490d3817e 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt @@ -246,13 +246,7 @@ class RestRetryFailedManagedIndexActionIT : IndexStateManagementRestTestCase() { updateManagedIndexConfigStartTime(managedIndexConfig) // verify we have policy - waitFor { - assertPredicatesOnMetaData( - listOf(indexName to listOf(ManagedIndexMetaData.POLICY_ID to policyID::equals)), - getExplainMap(indexName), - false - ) - } + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } // speed up to execute set read only force merge step updateManagedIndexConfigStartTime(managedIndexConfig) diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt index c1b642e62..f227ba77d 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/runner/ManagedIndexRunnerIT.kt @@ -63,13 +63,7 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() { // init policy on managed index updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { - assertPredicatesOnMetaData( - listOf(indexName to listOf(ManagedIndexMetaData.POLICY_ID to policyID::equals)), - getExplainMap(indexName), - strict = false - ) - } + waitFor { assertEquals(policy.id, getExplainManagedIndexMetaData(indexName).policyID) } // change policy seqNo on managed index updateManagedIndexConfigPolicySeqNo(managedIndexConfig.copy(policySeqNo = 17)) diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/ActionTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/ActionTests.kt index 003329b8f..a1083194e 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/ActionTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/ActionTests.kt @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.deletepolicy.DeletePolicyAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.indexpolicy.IndexPolicyAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPoliciesAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPolicyAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.RemovePolicyAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.retryfailedmanagedindex.RetryFailedManagedIndexAction @@ -65,4 +66,9 @@ class ActionTests : ESTestCase() { assertNotNull(GetPolicyAction.NAME) assertEquals(GetPolicyAction.INSTANCE.name(), GetPolicyAction.NAME) } + + fun `test get policies action name`() { + assertNotNull(GetPoliciesAction.NAME) + assertEquals(GetPoliciesAction.INSTANCE.name(), GetPoliciesAction.NAME) + } } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainRequestTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainRequestTests.kt index a66171258..00c064124 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainRequestTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainRequestTests.kt @@ -15,6 +15,7 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.SearchParams import org.elasticsearch.common.io.stream.BytesStreamOutput import org.elasticsearch.common.io.stream.StreamInput import org.elasticsearch.common.unit.TimeValue @@ -26,7 +27,8 @@ class ExplainRequestTests : ESTestCase() { val indices = listOf("index1", "index2") val local = true val masterTimeout = TimeValue.timeValueSeconds(30) - val req = ExplainRequest(indices, local, masterTimeout) + val params = SearchParams(0, 20, "sort-field", "asc", "*") + val req = ExplainRequest(indices, local, masterTimeout, params) val out = BytesStreamOutput() req.writeTo(out) diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt index a482cb90e..0b9cff806 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/explain/ExplainResponseTests.kt @@ -51,4 +51,38 @@ class ExplainResponseTests : ESTestCase() { assertEquals(indexPolicyIDs, newRes.indexPolicyIDs) assertEquals(indexMetadatas, newRes.indexMetadatas) } + + fun `test explain all response`() { + val indexNames = listOf("index1") + val indexPolicyIDs = listOf("policyID1") + val metadata = ManagedIndexMetaData( + index = "index1", + indexUuid = randomAlphaOfLength(10), + policyID = "policyID1", + policySeqNo = randomNonNegativeLong(), + policyPrimaryTerm = randomNonNegativeLong(), + policyCompleted = null, + rolledOver = null, + transitionTo = randomAlphaOfLength(10), + stateMetaData = null, + actionMetaData = null, + stepMetaData = null, + policyRetryInfo = null, + info = null + ) + val indexMetadatas = listOf(metadata) + val totalManagedIndices = 1 + val enabledState = mapOf("index1" to true) + val res = ExplainAllResponse(indexNames, indexPolicyIDs, indexMetadatas, totalManagedIndices, enabledState) + + val out = BytesStreamOutput() + res.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRes = ExplainAllResponse(sin) + assertEquals(indexNames, newRes.indexNames) + assertEquals(indexPolicyIDs, newRes.indexPolicyIDs) + assertEquals(indexMetadatas, newRes.indexMetadatas) + assertEquals(totalManagedIndices, newRes.totalManagedIndices) + assertEquals(enabledState, newRes.enabledState) + } } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesRequestTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesRequestTests.kt new file mode 100644 index 000000000..5be256ddf --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesRequestTests.kt @@ -0,0 +1,35 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy + +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.SearchParams +import org.elasticsearch.common.io.stream.BytesStreamOutput +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.test.ESTestCase + +class GetPoliciesRequestTests : ESTestCase() { + + fun `test get policies request`() { + val table = SearchParams(20, 0, "policy.policy_id.keyword", "desc", "*") + val req = GetPoliciesRequest(table) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = GetPoliciesRequest(sin) + assertEquals(table, newReq.searchParams) + } +} diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesResponseTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesResponseTests.kt new file mode 100644 index 000000000..1172cd930 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPoliciesResponseTests.kt @@ -0,0 +1,37 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy + +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.randomPolicy +import org.elasticsearch.common.io.stream.BytesStreamOutput +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.test.ESTestCase + +class GetPoliciesResponseTests : ESTestCase() { + + fun `test get policies response`() { + val policy = randomPolicy() + val res = GetPoliciesResponse(listOf(policy), 1) + + val out = BytesStreamOutput() + res.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRes = GetPoliciesResponse(sin) + assertEquals(1, newRes.totalPolicies) + assertEquals(1, newRes.policies.size) + assertEquals(policy, newRes.policies[0]) + } +} diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPolicyResponseTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPolicyResponseTests.kt index 7f9a95e7e..6dd588ac6 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPolicyResponseTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/GetPolicyResponseTests.kt @@ -27,7 +27,7 @@ import java.time.temporal.ChronoUnit class GetPolicyResponseTests : ESTestCase() { - fun `test explain response`() { + fun `test get policy response`() { val id = "id" val version: Long = 1 val primaryTerm: Long = 123