Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Move metadata #280

Merged
merged 22 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ ext {
}

group = "com.amazon.opendistroforelasticsearch"
version = "${opendistroVersion}.0"
version = "${opendistroVersion}.1"

dependencies {
compileOnly "org.elasticsearch:elasticsearch:${es_version}"
Expand Down Expand Up @@ -315,3 +315,46 @@ task ktlintFormat(type: JavaExec, group: "formatting") {
compileKotlin { kotlinOptions.freeCompilerArgs = ['-Xjsr305=strict'] }

apply from: 'build-tools/pkgbuild.gradle'

// This IT is to simulate the situation
// when there are old version (without metadata change)
// and new version mixed in one cluster
import org.elasticsearch.gradle.test.RestIntegTestTask

def mixedClusterTest = project.tasks.create('mixedCluster', RestIntegTestTask.class)
def mixedClusterFlag = findProperty('mixed') as Boolean ?: false
println("mixed cluster flag: $mixedClusterFlag")
mixedClusterTest.dependsOn(bundlePlugin)

testClusters.mixedCluster {
testDistribution = "OSS"
if (_numNodes > 1) numberOfNodes = _numNodes
getNodes().each { node ->
node.plugin(provider({
new RegularFile() {
@Override
File getAsFile() { fileTree("src/test/resources/job-scheduler").getSingleFile() }
}
}))

if (mixedClusterFlag && node.name == "mixedCluster-1") {
node.plugin(provider({
new RegularFile() {
@Override
File getAsFile() { fileTree("src/test/resources/index-management").getSingleFile() }
}
}))
} else {
node.plugin(project.tasks.bundlePlugin.archiveFile)
}
node.plugins.each { println("plugin in the node: ${it.get()}") }
}
setting 'path.repo', repo.absolutePath
}

mixedCluster {
systemProperty 'tests.security.manager', 'false'
systemProperty 'tests.path.repo', repo.absolutePath
systemProperty 'cluster.mixed', "$mixedClusterFlag"
systemProperty 'cluster.number_of_nodes', "${_numNodes}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ class IndexManagementIndices(
}
}

suspend fun attemptUpdateConfigIndexMapping(): Boolean {
return try {
val response: AcknowledgedResponse = client.suspendUntil { IndexUtils.checkAndUpdateConfigIndexMapping(clusterService.state(), client, it) }
if (response.isAcknowledged) return true
logger.error("Trying to update config index mapping not acknowledged.")
return false
} catch (e: Exception) {
logger.error("Failed when trying to update config index mapping.", e)
false
}
}

/**
* ============== History =============
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package com.amazon.opendistroforelasticsearch.indexmanagement
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.IndexStateManagementHistory
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.ManagedIndexRunner
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.MetadataService
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.SkipExecution
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestAddPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestChangePolicyAction
Expand Down Expand Up @@ -165,6 +168,9 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
RollupMetadata.ROLLUP_METADATA_TYPE -> {
return@ScheduledJobParser null
}
ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE -> {
return@ScheduledJobParser null
}
else -> {
logger.warn("Unsupported document was indexed in $INDEX_MANAGEMENT_INDEX with type: $fieldName")
xcp.skipChildren()
Expand Down Expand Up @@ -218,13 +224,6 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
): Collection<Any> {
val settings = environment.settings()
this.clusterService = clusterService
val managedIndexRunner = ManagedIndexRunner
.registerClient(client)
.registerClusterService(clusterService)
.registerNamedXContentRegistry(xContentRegistry)
.registerScriptService(scriptService)
.registerSettings(settings)
.registerConsumers() // registerConsumers must happen after registerSettings/clusterService
val rollupRunner = RollupRunner
.registerClient(client)
.registerClusterService(clusterService)
Expand All @@ -239,6 +238,8 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
.registerConsumers()
rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver)
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client, clusterService)
indexManagementIndices = IndexManagementIndices(settings, client.admin().indices(), clusterService)
val indexStateManagementHistory =
IndexStateManagementHistory(
Expand All @@ -249,8 +250,20 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
indexManagementIndices
)

val managedIndexRunner = ManagedIndexRunner
.registerClient(client)
.registerClusterService(clusterService)
.registerNamedXContentRegistry(xContentRegistry)
.registerScriptService(scriptService)
.registerSettings(settings)
.registerConsumers() // registerConsumers must happen after registerSettings/clusterService
.registerHistoryIndex(indexStateManagementHistory)
.registerSkipFlag(skipFlag)

val metadataService = MetadataService(client, clusterService, skipFlag, indexManagementIndices)

val managedIndexCoordinator = ManagedIndexCoordinator(environment.settings(),
client, clusterService, threadPool, indexManagementIndices)
client, clusterService, threadPool, indexManagementIndices, metadataService)

return listOf(managedIndexRunner, rollupRunner, indexManagementIndices, managedIndexCoordinator, indexStateManagementHistory)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_REPLICAS
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS
import com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting
import com.amazon.opendistroforelasticsearch.indexmanagement.util._DOC
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.DocWriteRequest
Expand All @@ -46,6 +47,7 @@ import org.elasticsearch.threadpool.Scheduler
import org.elasticsearch.threadpool.ThreadPool
import java.time.Instant

@OpenForTesting
class IndexStateManagementHistory(
settings: Settings,
private val client: Client,
Expand Down
Loading