Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT - do not review #145

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ buildscript {

ext {
isSnapshot = "true" == System.getProperty("build.snapshot", "false")
opensearch_version = System.getProperty("opensearch.version", "1.1.0")
opensearch_version = System.getProperty("opensearch.version", "1.1.0-SNAPSHOT")
// Taken from https://github.com/opensearch-project/alerting/blob/main/build.gradle#L33
// 1.0.0 -> 1.0.0.0, and 1.0.0-SNAPSHOT -> 1.0.0.0-SNAPSHOT
opensearch_build = opensearch_version.replaceAll(/(\.\d)([^\d]*)$/, '$1.0$2')
Expand All @@ -43,6 +43,7 @@ buildscript {
mavenLocal()
mavenCentral()
maven { url "https://plugins.gradle.org/m2/" }
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots/" }
}

dependencies {
Expand Down Expand Up @@ -120,6 +121,7 @@ repositories {
mavenLocal()
mavenCentral()
maven { url "https://plugins.gradle.org/m2/" }
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots/" }
jcenter()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.index.engine

import org.opensearch.index.translog.ReplicationTranslogDeletionPolicy
import org.opensearch.index.translog.TranslogDeletionPolicy

class LeaderReplicationEngine(config: EngineConfig) : InternalEngine(config) {

override fun getTranslogDeletionPolicy(engineConfig: EngineConfig): TranslogDeletionPolicy {
return ReplicationTranslogDeletionPolicy(
engineConfig.indexSettings,
engineConfig.retentionLeasesSupplier()
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@
* GitHub history for details.
*/

package org.opensearch.replication
package org.opensearch.index.engine

import org.opensearch.index.engine.EngineConfig
import org.opensearch.index.engine.InternalEngine
import org.opensearch.index.seqno.SequenceNumbers

class ReplicationEngine(config: EngineConfig) : InternalEngine(config) {
Expand All @@ -24,7 +22,7 @@ class ReplicationEngine(config: EngineConfig) : InternalEngine(config) {
}

override fun generateSeqNoForOperationOnPrimary(operation: Operation): Long {
check(operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { "Expected valid sequence number for replicate op but was unassigned"}
check(operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { "Expected valid sequence number for replicate op but was unassigned" }
return operation.seqNo()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.opensearch.index.translog

import org.opensearch.index.IndexSettings
import org.opensearch.index.IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING
import org.opensearch.index.seqno.RetentionLease
import org.opensearch.index.seqno.RetentionLeases
import org.opensearch.replication.ReplicationPlugin
import java.io.IOException
import java.util.function.Supplier

class ReplicationTranslogDeletionPolicy(
private val indexSettings: IndexSettings,
private val retentionLeasesSupplier: Supplier<RetentionLeases>
) : TranslogDeletionPolicy(
indexSettings.translogRetentionSize.bytes,
indexSettings.translogRetentionAge.millis,
indexSettings.translogRetentionTotalFiles
) {

/**
* returns the minimum translog generation that is still required by the system. Any generation below
* the returned value may be safely deleted
*
* @param readers current translog readers
* @param writer current translog writer
*/
@Synchronized
@Throws(IOException::class)
override fun minTranslogGenRequired(readers: List<TranslogReader>, writer: TranslogWriter): Long {
var retentionSizeInBytes: Long = indexSettings.translogRetentionSize.bytes
if (retentionSizeInBytes == -1L && indexSettings.settings.getAsBoolean(ReplicationPlugin.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key, false)) {
retentionSizeInBytes = INDEX_TRANSLOG_RETENTION_SIZE_SETTING.get(indexSettings.settings).bytes
}
val minBySize: Long = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes)
val minByRetentionLeases: Long = getMinTranslogGenByRetentionLease(readers, writer)
val minByTranslogGenSettings = super.minTranslogGenRequired(readers, writer)

// If retention size is specified, size takes precedence.
return Math.min(minByTranslogGenSettings, Math.max(minBySize, minByRetentionLeases))
}

private fun getMinTranslogGenByRetentionLease(readers: List<TranslogReader>, writer: TranslogWriter): Long {
var minGen: Long = writer.getGeneration();
val minimumRetainingSequenceNumber: Long = retentionLeasesSupplier.get()
.leases()
.stream()
.mapToLong(RetentionLease::retainingSequenceNumber)
.min()
.orElse(Long.MAX_VALUE);

for (i in readers.size - 1 downTo 0) {
val reader: TranslogReader = readers[i]
if(reader.getCheckpoint().minSeqNo <= minimumRetainingSequenceNumber &&
reader.getCheckpoint().maxSeqNo >= minimumRetainingSequenceNumber) {
minGen = Math.min(minGen, reader.getGeneration());
}
}
return minGen;
}
}
17 changes: 14 additions & 3 deletions src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ import org.opensearch.env.NodeEnvironment
import org.opensearch.index.IndexModule
import org.opensearch.index.IndexSettings
import org.opensearch.index.engine.EngineFactory
import org.opensearch.index.engine.LeaderReplicationEngine
import org.opensearch.index.engine.ReplicationEngine
import org.opensearch.indices.recovery.RecoverySettings
import org.opensearch.persistent.PersistentTaskParams
import org.opensearch.persistent.PersistentTaskState
Expand Down Expand Up @@ -153,6 +155,9 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
const val REPLICATION_EXECUTOR_NAME_FOLLOWER = "replication_follower"
val REPLICATED_INDEX_SETTING: Setting<String> = Setting.simpleString("index.plugins.replication.replicated",
Setting.Property.InternalIndex, Setting.Property.IndexScope)
val INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING: Setting<Boolean> = Setting.boolSetting(
"index.translog.retention_lease.pruning.enabled", false,
Setting.Property.IndexScope, Setting.Property.Dynamic)
val REPLICATION_FOLLOWER_OPS_BATCH_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.follower.index.ops_batch_size", 50000, 16,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_LEADER_THREADPOOL_SIZE: Setting<Int> = Setting.intSetting("plugins.replication.leader.thread_pool.size", 0, 0,
Expand Down Expand Up @@ -325,7 +330,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
}

override fun getSettings(): List<Setting<*>> {
return listOf(REPLICATED_INDEX_SETTING, REPLICATION_FOLLOWER_OPS_BATCH_SIZE, REPLICATION_LEADER_THREADPOOL_SIZE,
return listOf(REPLICATED_INDEX_SETTING, INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING, REPLICATION_FOLLOWER_OPS_BATCH_SIZE, REPLICATION_LEADER_THREADPOOL_SIZE,
REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE, REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD,
REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS,
REPLICATION_PARALLEL_READ_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL,
Expand All @@ -341,10 +346,16 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
}

override fun getEngineFactory(indexSettings: IndexSettings): Optional<EngineFactory> {
// The presence of REPLICATED_INDEX_SETTING indicates we are wiring up a follower cluster.
// In this case use the ReplicationEngine
return if (indexSettings.settings.get(REPLICATED_INDEX_SETTING.key) != null) {
Optional.of(EngineFactory { config -> org.opensearch.replication.ReplicationEngine(config) })
Optional.of(EngineFactory { config -> ReplicationEngine(config) })
} else {
Optional.empty()
// Otherwise we are wiring up a leader cluster, this should use LeaderReplicationEngine
// that provides custom TranslogDeletionPolicy logic that prunes the translog
// based on retention leases when setting
// INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING is enabled.
Optional.of(EngineFactory { config -> LeaderReplicationEngine(config) })
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import org.opensearch.common.inject.Inject
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.unit.TimeValue
import org.opensearch.index.IndexSettings
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.indices.IndicesService
import org.opensearch.replication.ReplicationPlugin.Companion.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING
import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_EXECUTOR_NAME_LEADER
import org.opensearch.replication.seqno.RemoteClusterStats
import org.opensearch.replication.seqno.RemoteClusterTranslogService
Expand Down Expand Up @@ -146,7 +146,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus

private fun isTranslogPruningByRetentionLeaseEnabled(shardId: ShardId): Boolean {
val enabled = clusterService.state().metadata.indices.get(shardId.indexName)
?.settings?.getAsBoolean(IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key, false)
?.settings?.getAsBoolean(INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key, false)
if(enabled != null) {
return enabled
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package org.opensearch.replication.repository

import org.opensearch.replication.ReplicationPlugin
import org.opensearch.replication.ReplicationPlugin.Companion.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING
import org.opensearch.replication.ReplicationSettings
import org.opensearch.replication.action.repository.GetStoreMetadataAction
import org.opensearch.replication.action.repository.GetStoreMetadataRequest
Expand Down Expand Up @@ -239,7 +240,7 @@ class RemoteClusterRepository(private val repositoryMetadata: RepositoryMetadata
builder.put(ReplicationPlugin.REPLICATED_INDEX_SETTING.key, replicatedIndex)

// Remove translog pruning for the follower index
builder.remove(IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key)
builder.remove(INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key)

val indexMdBuilder = IndexMetadata.builder(indexMetadata).settings(builder)
indexMetadata.aliases.valuesIt().forEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class RemoteClusterTranslogService : AbstractLifecycleComponent(){

override fun doClose() {
}

public fun getHistoryOfOperations(indexShard: IndexShard, startSeqNo: Long, toSeqNo: Long): List<Translog.Operation> {
if(!indexShard.hasCompleteHistoryOperations(SOURCE_NAME, Engine.HistorySource.TRANSLOG, startSeqNo)) {
log.debug("Doesn't have history of operations starting from $startSeqNo")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

package org.opensearch.replication.task.index

import org.opensearch.replication.ReplicationException
import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATED_INDEX_SETTING
import org.opensearch.replication.ReplicationPlugin.Companion.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING
import org.opensearch.replication.ReplicationSettings
import org.opensearch.replication.action.index.block.IndexBlockUpdateType
import org.opensearch.replication.action.index.block.UpdateIndexBlockAction
Expand Down Expand Up @@ -78,7 +78,6 @@ import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.index.Index
import org.opensearch.index.IndexService
import org.opensearch.index.IndexSettings
import org.opensearch.index.IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.shard.ShardId
import org.opensearch.indices.cluster.IndicesClusterStateService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import org.opensearch.index.mapper.MapperService
import org.opensearch.repositories.fs.FsRepository
import org.opensearch.test.OpenSearchTestCase.assertBusy
import org.junit.Assert
import org.opensearch.replication.ReplicationPlugin.Companion.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING
import org.opensearch.replication.followerStats
import org.opensearch.replication.leaderStats
import org.opensearch.replication.task.index.IndexReplicationExecutor.Companion.log
Expand Down Expand Up @@ -384,13 +385,13 @@ class StartReplicationIT: MultiClusterRestTestCase() {
.isEqualTo(true)
assertThat(followerClient.indices()
.getSettings(GetSettingsRequest().indices(followerIndexName), RequestOptions.DEFAULT)
.getSetting(followerIndexName, IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key)
.getSetting(followerIndexName, INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key)
.isNullOrEmpty())
}

assertThat(leaderClient.indices()
.getSettings(GetSettingsRequest().indices(leaderIndexName), RequestOptions.DEFAULT)
.getSetting(leaderIndexName, IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key) == "true")
.getSetting(leaderIndexName, INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key) == "true")

} finally {
followerClient.stopReplication(followerIndexName)
Expand All @@ -414,7 +415,7 @@ class StartReplicationIT: MultiClusterRestTestCase() {
.isEqualTo(true)
}
// Turn-off the settings and index doc
val settingsBuilder = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key, false)
val settingsBuilder = Settings.builder().put(INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.key, false)
val settingsUpdateResponse = leaderClient.indices().putSettings(UpdateSettingsRequest(leaderIndexName)
.settings(settingsBuilder.build()), RequestOptions.DEFAULT)
Assert.assertEquals(settingsUpdateResponse.isAcknowledged, true)
Expand Down