Skip to content

Commit

Permalink
Add extension point for custom TranslogDeletionPolicy in EnginePlugin. (
Browse files Browse the repository at this point in the history
#1404)

This commit adds a method that can be used to provide a custom TranslogDeletionPolicy 
from within plugins that implement the EnginePlugin interface. This enables plugins to 
provide a custom deletion policy with the current limitation that only one plugin can  
override the policy. An exception will be thrown if more than one plugin overrides the 
policy.
  • Loading branch information
adnapibar authored Oct 22, 2021
1 parent 8f56612 commit 2ebd0e0
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -70,6 +71,7 @@ public final class EngineConfig {
private final ShardId shardId;
private final IndexSettings indexSettings;
private final ByteSizeValue indexingBufferSize;
private final TranslogDeletionPolicyFactory translogDeletionPolicyFactory;
private volatile boolean enableGcDeletes = true;
private final TimeValue flushMergesAfter;
private final String codecName;
Expand Down Expand Up @@ -145,10 +147,61 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {

private final TranslogConfig translogConfig;

public EngineConfig(
ShardId shardId,
ThreadPool threadPool,
IndexSettings indexSettings,
Engine.Warmer warmer,
Store store,
MergePolicy mergePolicy,
Analyzer analyzer,
Similarity similarity,
CodecService codecService,
Engine.EventListener eventListener,
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig,
TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener,
Sort indexSort,
CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier
) {
this(
shardId,
threadPool,
indexSettings,
warmer,
store,
mergePolicy,
analyzer,
similarity,
codecService,
eventListener,
queryCache,
queryCachingPolicy,
translogConfig,
null,
flushMergesAfter,
externalRefreshListener,
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier
);
}

/**
* Creates a new {@link org.opensearch.index.engine.EngineConfig}
*/
public EngineConfig(
EngineConfig(
ShardId shardId,
ThreadPool threadPool,
IndexSettings indexSettings,
Expand All @@ -162,6 +215,7 @@ public EngineConfig(
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig,
TranslogDeletionPolicyFactory translogDeletionPolicyFactory,
TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener,
Expand Down Expand Up @@ -200,6 +254,7 @@ public EngineConfig(
this.queryCache = queryCache;
this.queryCachingPolicy = queryCachingPolicy;
this.translogConfig = translogConfig;
this.translogDeletionPolicyFactory = translogDeletionPolicyFactory;
this.flushMergesAfter = flushMergesAfter;
this.externalRefreshListener = externalRefreshListener;
this.internalRefreshListener = internalRefreshListener;
Expand Down Expand Up @@ -423,4 +478,8 @@ public interface TombstoneDocSupplier {
public TombstoneDocSupplier getTombstoneDocSupplier() {
return tombstoneDocSupplier;
}

public TranslogDeletionPolicyFactory getCustomTranslogDeletionPolicyFactory() {
return translogDeletionPolicyFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.PluginsService;
Expand All @@ -38,7 +39,8 @@
* A factory to create an EngineConfig based on custom plugin overrides
*/
public class EngineConfigFactory {
private final Optional<CodecService> codecService;
private final CodecService codecService;
private final TranslogDeletionPolicyFactory translogDeletionPolicyFactory;

/** default ctor primarily used for tests without plugins */
public EngineConfigFactory(IndexSettings idxSettings) {
Expand All @@ -56,6 +58,8 @@ public EngineConfigFactory(PluginsService pluginsService, IndexSettings idxSetti
EngineConfigFactory(Collection<EnginePlugin> enginePlugins, IndexSettings idxSettings) {
Optional<CodecService> codecService = Optional.empty();
String codecServiceOverridingPlugin = null;
Optional<TranslogDeletionPolicyFactory> translogDeletionPolicyFactory = Optional.empty();
String translogDeletionPolicyOverridingPlugin = null;
for (EnginePlugin enginePlugin : enginePlugins) {
// get overriding codec service from EnginePlugin
if (codecService.isPresent() == false) {
Expand All @@ -69,11 +73,23 @@ public EngineConfigFactory(PluginsService pluginsService, IndexSettings idxSetti
+ enginePlugin.getClass().getName()
);
}
if (translogDeletionPolicyFactory.isPresent() == false) {
translogDeletionPolicyFactory = enginePlugin.getCustomTranslogDeletionPolicyFactory();
translogDeletionPolicyOverridingPlugin = enginePlugin.getClass().getName();
} else {
throw new IllegalStateException(
"existing TranslogDeletionPolicyFactory is already overridden in: "
+ translogDeletionPolicyOverridingPlugin
+ " attempting to override again by: "
+ enginePlugin.getClass().getName()
);
}
}
this.codecService = codecService;
this.codecService = codecService.orElse(null);
this.translogDeletionPolicyFactory = translogDeletionPolicyFactory.orElse((idxs, rtls) -> null);
}

/** Insantiates a new EngineConfig from the provided custom overrides */
/** Instantiates a new EngineConfig from the provided custom overrides */
public EngineConfig newEngineConfig(
ShardId shardId,
ThreadPool threadPool,
Expand Down Expand Up @@ -108,11 +124,12 @@ public EngineConfig newEngineConfig(
mergePolicy,
analyzer,
similarity,
this.codecService.isPresent() == true ? this.codecService.get() : codecService,
this.codecService != null ? this.codecService : codecService,
eventListener,
queryCache,
queryCachingPolicy,
translogConfig,
translogDeletionPolicyFactory,
flushMergesAfter,
externalRefreshListener,
internalRefreshListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,21 @@ public InternalEngine(EngineConfig engineConfig) {
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
updateAutoIdTimestamp(Long.MAX_VALUE, true);
}
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles(),
engineConfig.retentionLeasesSupplier()
);
final TranslogDeletionPolicy translogDeletionPolicy;
TranslogDeletionPolicy customTranslogDeletionPolicy = null;
if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {
customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory()
.create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier());
}
if (customTranslogDeletionPolicy != null) {
translogDeletionPolicy = customTranslogDeletionPolicy;
} else {
translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
);
}
store.incRef();
IndexWriter writer = null;
Translog translog = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog;

import org.opensearch.index.IndexSettings;
import org.opensearch.index.seqno.RetentionLeases;

import java.util.function.Supplier;

@FunctionalInterface
public interface TranslogDeletionPolicyFactory {
TranslogDeletionPolicy create(IndexSettings settings, Supplier<RetentionLeases> supplier);
}
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/plugins/EnginePlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;

import java.util.Optional;
import java.util.function.Supplier;

/**
* A plugin that provides alternative engine implementations.
Expand All @@ -63,4 +67,17 @@ public interface EnginePlugin {
default Optional<CodecService> getCustomCodecService(IndexSettings indexSettings) {
return Optional.empty();
}

/**
* When an index is created this method is invoked for each engine plugin. Engine plugins that need to provide a
* custom {@link TranslogDeletionPolicy} can override this method to return a function that takes the {@link IndexSettings}
* and a {@link Supplier} for {@link RetentionLeases} and returns a custom {@link TranslogDeletionPolicy}.
*
* Only one of the installed Engine plugins can override this otherwise {@link IllegalStateException} will be thrown.
*
* @return a function that returns an instance of {@link TranslogDeletionPolicy}
*/
default Optional<TranslogDeletionPolicyFactory> getCustomTranslogDeletionPolicyFactory() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.engine;

import org.apache.logging.log4j.LogManager;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.OpenSearchTestCase;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

public class EngineConfigFactoryTests extends OpenSearchTestCase {
public void testCreateEngineConfigFromFactory() {
IndexMetadata meta = IndexMetadata.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
List<EnginePlugin> plugins = Collections.singletonList(new FooEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());
EngineConfigFactory factory = new EngineConfigFactory(plugins, indexSettings);

EngineConfig config = factory.newEngineConfig(
null,
null,
indexSettings,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
TimeValue.timeValueMinutes(5),
null,
null,
null,
null,
null,
() -> new RetentionLeases(0, 0, Collections.emptyList()),
null,
null
);

assertNotNull(config.getCodec());
assertNotNull(config.getCustomTranslogDeletionPolicyFactory());
assertTrue(config.getCustomTranslogDeletionPolicyFactory().create(indexSettings, null) instanceof CustomTranslogDeletionPolicy);
}

public void testCreateEngineConfigFromFactoryMultipleCodecServiceIllegalStateException() {
IndexMetadata meta = IndexMetadata.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
List<EnginePlugin> plugins = Arrays.asList(new FooEnginePlugin(), new BarEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());

expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings));
}

public void testCreateEngineConfigFromFactoryMultipleCustomTranslogDeletionPolicyFactoryIllegalStateException() {
IndexMetadata meta = IndexMetadata.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
List<EnginePlugin> plugins = Arrays.asList(new FooEnginePlugin(), new BazEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());

expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings));
}

private static class FooEnginePlugin extends Plugin implements EnginePlugin {
@Override
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
return Optional.empty();
}

@Override
public Optional<CodecService> getCustomCodecService(IndexSettings indexSettings) {
return Optional.of(new CodecService(null, LogManager.getLogger(getClass())));
}

@Override
public Optional<TranslogDeletionPolicyFactory> getCustomTranslogDeletionPolicyFactory() {
return Optional.of(CustomTranslogDeletionPolicy::new);
}
}

private static class BarEnginePlugin extends Plugin implements EnginePlugin {
@Override
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
return Optional.empty();
}

@Override
public Optional<CodecService> getCustomCodecService(IndexSettings indexSettings) {
return Optional.of(new CodecService(null, LogManager.getLogger(getClass())));
}
}

private static class BazEnginePlugin extends Plugin implements EnginePlugin {
@Override
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
return Optional.empty();
}

@Override
public Optional<TranslogDeletionPolicyFactory> getCustomTranslogDeletionPolicyFactory() {
return Optional.of(CustomTranslogDeletionPolicy::new);
}
}

private static class CustomTranslogDeletionPolicy extends TranslogDeletionPolicy {
public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier<RetentionLeases> retentionLeasesSupplier) {
super(
indexSettings.getTranslogRetentionSize().getBytes(),
indexSettings.getTranslogRetentionAge().getMillis(),
indexSettings.getTranslogRetentionTotalFiles()
);
}
}
}
Loading

0 comments on commit 2ebd0e0

Please sign in to comment.