Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

new ISM template #383

Merged
merged 26 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
789dd87
start index template API
bowenlan-amzn Nov 18, 2020
c93795b
start to modify put ism API to observe regulation
bowenlan-amzn Dec 9, 2020
964ef61
conform to regulation add template API
bowenlan-amzn Dec 9, 2020
a29eae1
save progress
bowenlan-amzn Dec 9, 2020
5eefbe2
test in progress
bowenlan-amzn Dec 9, 2020
2b1ac28
draft IT
bowenlan-amzn Dec 10, 2020
31889da
simple tests for request response
bowenlan-amzn Dec 10, 2020
80d4519
ktlint
bowenlan-amzn Dec 10, 2020
b9aaa7a
start to clean code
bowenlan-amzn Dec 10, 2020
ff40535
wanna see code cov
bowenlan-amzn Dec 10, 2020
182e3c3
clean up 1
bowenlan-amzn Dec 10, 2020
27b1f6f
try remove seeming not used part in template metadata
bowenlan-amzn Dec 11, 2020
82f368a
clean up
bowenlan-amzn Dec 16, 2020
24a4791
going to clean up
bowenlan-amzn Jan 12, 2021
8c7067c
new implementation
bowenlan-amzn Jan 12, 2021
796fdd5
Merge branch 'master' into ismtemplate2
bowenlan-amzn Jan 21, 2021
5d9fc6b
Merge branch 'ismtemplate2' of github.com:bowenlan-amzn/index-managem…
bowenlan-amzn Jan 22, 2021
b69d224
address Ravi comments
bowenlan-amzn Jan 22, 2021
ead0450
Merge branch 'master' into ismtemplate2
dbbaughe Jan 26, 2021
f3d54e2
Merge branch 'master' into ismtemplate2
bowenlan-amzn Jan 26, 2021
77f9d66
address Drew's comments
bowenlan-amzn Jan 26, 2021
f275a29
suppress detekt complain
bowenlan-amzn Jan 26, 2021
d5578d0
add a test for ISMTemplate Writeable
bowenlan-amzn Jan 27, 2021
31f6452
Merge branch 'master' into ismtemplate2
bowenlan-amzn Jan 28, 2021
f39d371
coordinator consistent
bowenlan-amzn Jan 28, 2021
33b3544
address Mo's comments
bowenlan-amzn Jan 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
start index template API
add filter by lastupdatetime, sort by priority logic
  • Loading branch information
bowenlan-amzn committed Jan 12, 2021
commit 789dd87e151b25206a3d3aba97b835814c7243e7
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ package com.amazon.opendistroforelasticsearch.indexmanagement
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.IndexStateManagementHistory
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.ManagedIndexRunner
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplateMetadata
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestAddISMTemplateAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestAddPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestChangePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestDeleteISMTemplateAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestDeletePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestExplainAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestGetISMTemplateAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestGetPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestIndexPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestRemovePolicyAction
Expand All @@ -43,6 +47,12 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.TransportGetPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.indexpolicy.IndexPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.indexpolicy.TransportIndexPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.ismtemplate.delete.DeleteISMTemplateAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.ismtemplate.delete.TransportDeleteISMTemplateAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.ismtemplate.get.GetISMTemplateAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.ismtemplate.get.TransportGetISMTemplateAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.ismtemplate.put.PutISMTemplateAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.ismtemplate.put.TransportPutISMTemplateAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.RemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.TransportRemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.retryfailedmanagedindex.RetryFailedManagedIndexAction
Expand Down Expand Up @@ -90,15 +100,19 @@ import org.elasticsearch.action.ActionRequest
import org.elasticsearch.action.ActionResponse
import org.elasticsearch.action.support.ActionFilter
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.NamedDiff
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver
import org.elasticsearch.cluster.metadata.Metadata
import org.elasticsearch.cluster.node.DiscoveryNodes
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.io.stream.NamedWriteableRegistry
import org.elasticsearch.common.io.stream.Writeable
import org.elasticsearch.common.settings.ClusterSettings
import org.elasticsearch.common.settings.IndexScopedSettings
import org.elasticsearch.common.settings.Setting
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.settings.SettingsFilter
import org.elasticsearch.common.xcontent.ContextParser
import org.elasticsearch.common.util.concurrent.ThreadContext
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentParser.Token
Expand Down Expand Up @@ -132,6 +146,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
const val ROLLUP_BASE_URI = "$OPEN_DISTRO_BASE_URI/_rollup"
const val POLICY_BASE_URI = "$ISM_BASE_URI/policies"
const val ROLLUP_JOBS_BASE_URI = "$ROLLUP_BASE_URI/jobs"
const val ISM_TEMPLATE_BASE_URI = "$ISM_BASE_URI/templates"
const val INDEX_MANAGEMENT_INDEX = ".opendistro-ism-config"
const val INDEX_MANAGEMENT_JOB_TYPE = "opendistro-index-management"
const val INDEX_STATE_MANAGEMENT_HISTORY_TYPE = "managed_index_meta_data"
Expand Down Expand Up @@ -197,7 +212,10 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
RestIndexRollupAction(),
RestStartRollupAction(),
RestStopRollupAction(),
RestExplainRollupAction()
RestExplainRollupAction(),
RestAddISMTemplateAction(),
RestGetISMTemplateAction(),
RestDeleteISMTemplateAction()
)
}

Expand Down Expand Up @@ -298,10 +316,42 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
ActionPlugin.ActionHandler(StartRollupAction.INSTANCE, TransportStartRollupAction::class.java),
ActionPlugin.ActionHandler(StopRollupAction.INSTANCE, TransportStopRollupAction::class.java),
ActionPlugin.ActionHandler(ExplainRollupAction.INSTANCE, TransportExplainRollupAction::class.java),
ActionPlugin.ActionHandler(UpdateRollupMappingAction.INSTANCE, TransportUpdateRollupMappingAction::class.java)
ActionPlugin.ActionHandler(UpdateRollupMappingAction.INSTANCE, TransportUpdateRollupMappingAction::class.java),
ActionPlugin.ActionHandler(PutISMTemplateAction.INSTANCE, TransportPutISMTemplateAction::class.java),
ActionPlugin.ActionHandler(GetISMTemplateAction.INSTANCE, TransportGetISMTemplateAction::class.java),
ActionPlugin.ActionHandler(DeleteISMTemplateAction.INSTANCE, TransportDeleteISMTemplateAction::class.java)
)
}

// override fun getNamedXContent(): MutableList<NamedXContentRegistry.Entry> {
// val entries = mutableListOf<NamedXContentRegistry.Entry>()
// val ismTemplateEntry = NamedXContentRegistry.Entry(
// Metadata.Custom::class.java,
// ISMTemplateMetadata.ISM_TEMPLATE,
// ContextParser{ p, _ -> ISMTemplateMetadata.parse(p) }
// )
// entries.add(ismTemplateEntry)
// return entries
// }

override fun getNamedWriteables(): MutableList<NamedWriteableRegistry.Entry> {
// ClusterModule 139
val entries = mutableListOf<NamedWriteableRegistry.Entry>()
val ismTemplateEntry = NamedWriteableRegistry.Entry(
Metadata.Custom::class.java,
ISMTemplateMetadata.TYPE,
Writeable.Reader{ sin -> ISMTemplateMetadata(sin) }
)
val ismTemplateEntry2 = NamedWriteableRegistry.Entry(
NamedDiff::class.java,
ISMTemplateMetadata.TYPE,
Writeable.Reader{ sin -> ISMTemplateMetadata.readDiffFrom(sin) }
)
entries.add(ismTemplateEntry)
entries.add(ismTemplateEntry2)
return entries
}

override fun getTransportInterceptors(namedWriteableRegistry: NamedWriteableRegistry, threadContext: ThreadContext): List<TransportInterceptor> {
return listOf(rollupInterceptor)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.ismTemplates
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.putISMTemplate
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.removeISMTemplate
import org.apache.logging.log4j.LogManager
import org.apache.lucene.util.automaton.Operations
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.cluster.ClusterState
import org.elasticsearch.cluster.ClusterStateUpdateTask
import org.elasticsearch.cluster.metadata.IndexMetadata
import org.elasticsearch.cluster.metadata.Metadata
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.Priority
import org.elasticsearch.common.Strings
import org.elasticsearch.common.ValidationException
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.regex.Regex
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.indices.InvalidIndexTemplateException
import java.util.*
import java.util.stream.Collectors

private val log = LogManager.getLogger(ISMTemplateService::class.java)

// MetadataIndexTemplateService
class ISMTemplateService @Inject constructor(
val clusterService: ClusterService
) {
/**
* save ISM template to cluster state metadata
*/
fun putISMTemplate(templateName: String, template: ISMTemplate, masterTimeout: TimeValue,
listener: ActionListener<AcknowledgedResponse>) {
clusterService.submitStateUpdateTask(
IndexManagementPlugin.PLUGIN_NAME,
object : ClusterStateUpdateTask(Priority.NORMAL) {
override fun execute(currentState: ClusterState): ClusterState {
return addISMTemplate(currentState, templateName, template)
}

override fun onFailure(source: String, e: Exception) {
listener.onFailure(e)
}

override fun timeout(): TimeValue = masterTimeout

override fun clusterStateProcessed(source: String, oldState: ClusterState, newState: ClusterState) {
listener.onResponse(AcknowledgedResponse(true))
}
}
)
}

fun addISMTemplate(currentState: ClusterState, templateName: String, template: ISMTemplate): ClusterState {
val existingTemplates = currentState.metadata.ismTemplates()
val existingTemplate = existingTemplates[templateName]

log.info("existing matching template $existingTemplate")
log.info("input template $template")

if (template == existingTemplate) return currentState

// find templates with overlapping index pattern
val overlaps = findConflictingISMTemplates(templateName, template.indexPatterns, template.priority, existingTemplates)
log.info("find overlapping templates $overlaps")
if (overlaps.isNotEmpty()) {
val esg = "new ism template $templateName has index pattern ${template.indexPatterns} matching existing templates ${overlaps.entries.stream().map { "${it.key} => ${it.value}" }.collect(Collectors.joining(","))}, please use a different priority than ${template.priority}"
throw IllegalArgumentException(esg)
}

validateFormat(templateName, template.indexPatterns)

log.info("updating ISM template $templateName")
return ClusterState.builder(currentState).metadata(Metadata.builder(currentState.metadata())
.putISMTemplate(templateName, template, existingTemplates)).build()
}

/**
* remove ISM template from cluster state metadata
*/
fun deleteISMTemplate(templateName: String, masterTimeout: TimeValue, listener: ActionListener<AcknowledgedResponse>) {
log.info("service remove template")
clusterService.submitStateUpdateTask(
IndexManagementPlugin.PLUGIN_NAME,
object : ClusterStateUpdateTask(Priority.NORMAL) {
override fun execute(currentState: ClusterState): ClusterState {
log.info("service remove template $templateName")
val existingTemplates = currentState.metadata.ismTemplates()
return ClusterState.builder(currentState).metadata(Metadata.builder(currentState.metadata).removeISMTemplate(templateName, existingTemplates)).build()
}

override fun onFailure(source: String, e: Exception) {
listener.onFailure(e)
}

override fun timeout(): TimeValue = masterTimeout

override fun clusterStateProcessed(source: String, oldState: ClusterState, newState: ClusterState) {
listener.onResponse(AcknowledgedResponse(true))
}
}
)
}

companion object {
/**
* find the matching template name for the given index name
*
* filter out hidden index
* filter out older index than template lastUpdateTime
*/
// findV2Template
fun findMatchingISMTemplate(ismTemplates: Map<String, ISMTemplate>, indexMetadata: IndexMetadata): String? {
val indexName = indexMetadata.index.name

// don't include hidden index
val isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings)
log.info("index $indexName is hidden $isHidden")
if (isHidden) return null

val ismTemplates = ismTemplates.filter { (_, template) ->
log.info("template last update time: ${template.lastUpdatedTime.toEpochMilli()}")
log.info("index create time: ${indexMetadata.creationDate}")
log.info("is template older? ${template.lastUpdatedTime.toEpochMilli() < indexMetadata.creationDate}")
template.lastUpdatedTime.toEpochMilli() < indexMetadata.creationDate
}

// traverse all ism templates for matching ones
val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, indexName) }
val matchedTemplates = mutableMapOf<ISMTemplate, String>()
ismTemplates.forEach { (templateName, template) ->
val matched = template.indexPatterns.stream().anyMatch(patternMatchPredicate)
if (matched) matchedTemplates[template] = templateName
}

if (matchedTemplates.isEmpty()) return null
log.info("all matching templates $matchedTemplates")

// sort by template priority
val winner = matchedTemplates.keys.maxBy { it.priority }
log.info("winner with highest priority is $winner")
return matchedTemplates[winner]
}

fun validateFormat(templateName: String, indexPatterns: List<String>) {
val validationErrors = mutableListOf<String>()
if (templateName.contains(" ")) {
validationErrors.add("name must not contain a space")
}
if (templateName.contains(",")) {
validationErrors.add("name must not contain a ','")
}
if (templateName.contains("#")) {
validationErrors.add("name must not contain a '#'")
}
if (templateName.contains("*")) {
validationErrors.add("name must not contain a '*'")
}
if (templateName.startsWith("_")) {
validationErrors.add("name must not start with '_'")
}
if (templateName.toLowerCase(Locale.ROOT) != templateName) {
validationErrors.add("name must be lower cased")
}
for (indexPattern in indexPatterns) {
if (indexPattern.contains(" ")) {
validationErrors.add("index_patterns [$indexPattern] must not contain a space")
}
if (indexPattern.contains(",")) {
validationErrors.add("index_pattern [$indexPattern] must not contain a ','")
}
if (indexPattern.contains("#")) {
validationErrors.add("index_pattern [$indexPattern] must not contain a '#'")
}
if (indexPattern.contains(":")) {
validationErrors.add("index_pattern [$indexPattern] must not contain a ':'")
}
if (indexPattern.startsWith("_")) {
validationErrors.add("index_pattern [$indexPattern] must not start with '_'")
}
if (!Strings.validFileNameExcludingAstrix(indexPattern)) {
validationErrors.add("index_pattern [" + indexPattern + "] must not contain the following characters " +
Strings.INVALID_FILENAME_CHARS)
}
}

if (validationErrors.size > 0) {
val validationException = ValidationException()
validationException.addValidationErrors(validationErrors)
throw InvalidIndexTemplateException(templateName, validationException.message)
}
}

/**
* find templates whose index patterns overlap with given template
*
* @return map of overlapping template name to its index patterns
*/
// addIndexTemplateV2 findConflictingV2Templates
fun findConflictingISMTemplates(candidate: String, indexPatterns: List<String>, priority: Int, ismTemplates: Map<String, ISMTemplate>): Map<String, List<String>> {
// focus on template with same priority
val ismTemplates = ismTemplates.filter { it.value.priority == priority }
val automaton1 = Regex.simpleMatchToAutomaton(*indexPatterns.toTypedArray())
val overlappingTemplates = mutableMapOf<String, List<String>>()
ismTemplates.forEach { (templateName, template) ->
val automaton2 = Regex.simpleMatchToAutomaton(*template.indexPatterns.toTypedArray())
if (!Operations.isEmpty(Operations.intersection(automaton1, automaton2))) {
log.info("existing template $templateName overlaps candidate $candidate")
overlappingTemplates[templateName] = template.indexPatterns
}
}
overlappingTemplates.remove(candidate)
return overlappingTemplates
}
}
}
Loading