Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding CheckpointRefreshListener to trigger when Segment replication is turned on and Primary shard refreshes #3108

Merged
merged 21 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4addc10
Intial PR adding classes and tests related to checkpoint publishing
Rishikesh1159 Apr 28, 2022
37f49d2
Putting a Draft PR with all changes in classes. Testing is still not …
Rishikesh1159 Apr 29, 2022
3210e33
Merge branch 'opensearch-project:main' into seg-rep/checkpoint-publish
Rishikesh1159 May 5, 2022
730b601
Wiring up index shard to new engine, spotless apply and removing unne…
Rishikesh1159 May 5, 2022
b216116
Merge branch 'seg-rep/checkpoint-publish' of https://github.com/Rishi…
Rishikesh1159 May 5, 2022
4928270
Adding Unit test for checkpointRefreshListener
Rishikesh1159 May 6, 2022
7777d1e
Merge branch 'main' into seg-rep/checkpoint-publish
Rishikesh1159 May 6, 2022
fa39d47
Applying spotless check
Rishikesh1159 May 6, 2022
5907f0b
Merge branch 'seg-rep/checkpoint-publish' of https://github.com/Rishi…
Rishikesh1159 May 6, 2022
e40dc41
Fixing import statements *
Rishikesh1159 May 6, 2022
5a9d544
Merge branch 'opensearch-project:main' into seg-rep/checkpoint-publish
Rishikesh1159 May 9, 2022
46557bb
removing unused constructor in index shard
Rishikesh1159 May 11, 2022
7ac0b90
Addressing comments from last commit
Rishikesh1159 May 13, 2022
ff91dec
Merge branch 'opensearch-project:main' into seg-rep/checkpoint-publish
Rishikesh1159 May 13, 2022
9bbfe20
Adding package-info.java files for two new packages
Rishikesh1159 May 13, 2022
4c97019
Merge branch 'opensearch-project:main' into seg-rep/checkpoint-publish
Rishikesh1159 May 19, 2022
bd17e4f
Merge branch 'opensearch-project:main' into seg-rep/checkpoint-publish
Rishikesh1159 May 23, 2022
6c75f6d
Adding test for null checkpoint publisher and addreesing PR comments
Rishikesh1159 May 23, 2022
d78e8ae
Merge branch 'main' into seg-rep/checkpoint-publish
Rishikesh1159 May 23, 2022
cf7c92e
Add docs for indexshardtests and remove shard.refresh
Rishikesh1159 May 23, 2022
591e608
Merge branch 'seg-rep/checkpoint-publish' of https://github.com/Rishi…
Rishikesh1159 May 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.DummyShardLock;
Expand Down Expand Up @@ -673,7 +674,8 @@ public static final IndexShard newIndexShard(
Arrays.asList(listeners),
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs
cbs,
SegmentReplicationCheckpointPublisher.EMPTY
);
}

Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
Expand Down Expand Up @@ -423,7 +424,8 @@ private long getAvgShardSizeInBytes() throws IOException {
public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -525,7 +527,8 @@ public synchronized IndexShard createShard(
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService
circuitBreakerService,
checkpointPublisher
Rishikesh1159 marked this conversation as resolved.
Show resolved Hide resolved
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.ReferenceManager;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;

import java.io.IOException;

/**
* A {@link ReferenceManager.RefreshListener} that publishes a checkpoint to be consumed by replicas.
* This class is only used with Segment Replication enabled.
*/
public class CheckpointRefreshListener implements ReferenceManager.RefreshListener {

protected static Logger logger = LogManager.getLogger(CheckpointRefreshListener.class);

private final IndexShard shard;
private final SegmentReplicationCheckpointPublisher publisher;

public CheckpointRefreshListener(IndexShard shard, SegmentReplicationCheckpointPublisher publisher) {
this.shard = shard;
this.publisher = publisher;
}

@Override
public void beforeRefresh() throws IOException {
// Do nothing
}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
publisher.publish(shard);
}
}
}
26 changes: 24 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@
import org.opensearch.indices.recovery.RecoveryFailedException;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.copy.ReplicationCheckpoint;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -299,6 +302,8 @@ Runnable getGlobalCheckpointSyncer() {
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;

private final CheckpointRefreshListener checkpointRefreshListener;

Rishikesh1159 marked this conversation as resolved.
Show resolved Hide resolved
public IndexShard(
final ShardRouting shardRouting,
final IndexSettings indexSettings,
Expand All @@ -319,7 +324,8 @@ public IndexShard(
final List<IndexingOperationListener> listeners,
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService
final CircuitBreakerService circuitBreakerService,
final SegmentReplicationCheckpointPublisher checkpointPublisher
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking we should overload the constructor here (or subclass indexShard) and conditionally use it if segrep is enabled when IndexShards are created from IndexService.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for creating an IndexShard should take of injecting the correct concrete implementation or noop as needed

) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -402,6 +408,7 @@ public boolean shouldCache(Query query) {
persistMetadata(path, indexSettings, shardRouting, null, logger);
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher);
Rishikesh1159 marked this conversation as resolved.
Show resolved Hide resolved
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -1362,6 +1369,14 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}
}

public ReplicationCheckpoint getLatestReplicationCheckpoint() {
Rishikesh1159 marked this conversation as resolved.
Show resolved Hide resolved
return new ReplicationCheckpoint(shardId, 0, 0, 0, 0);
}

public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) {
// TODO
}
Rishikesh1159 marked this conversation as resolved.
Show resolved Hide resolved

/**
* gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard,
* without having to worry about the current state of the engine and concurrent flushes.
Expand Down Expand Up @@ -3100,6 +3115,13 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
}
};

final List<ReferenceManager.RefreshListener> internalRefreshListener;
if (indexSettings.isSegRepEnabled() && shardRouting.primary()) {
internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener);
} else {
internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't used anywhere? It should be wired up in newEngineConfig below.

Should this be an internal or external listener?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will wire this up. But not sure if it should be external or internal listener. I am using internal listener for now, we can have a discussion on this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets push this down to the creation of IndexShard so that we can minimize the branching across the board and create the right flavour of IndexShard for segrep

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked this idea at first, but I think we should maybe leave this block while the feature is still behind feature flags. Once the feature flag is removed then we wire up the listener even if its a NoOp.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick - can be written as:

List<ReferenceManager.RefreshListener> internalRefreshListeners = Arrays.asList(new RefreshMetricUpdater(refreshMetric));
if (this. checkpointRefreshListener != null) {
    internalRefreshListeners.add(checkpointRefreshListener);
}
internalRefreshListeners = Collections.unmodifiableList(internalRefreshListeners);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kartg this we how we have it now, The above highlighted one is outdated.

final List<ReferenceManager.RefreshListener> internalRefreshListener;
        if (this.checkpointRefreshListener != null) {
            internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener);
        } else {
            internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric));
        }

Also if we try to write it as your suggested way, then internalRefreshListeners cannot be final, we have been using it as final even before this PR. Not sure if changing internalRefreshListener to a non-final variable is a good idea.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this block declares and defines the internalRefreshListener variable, the final modifier doesn't add value IMO. What is more valuable to maintain is the unmodifiable list, which is lost if checkpointRefreshListener is non-null.

If you'd really like to retain the final modifier, I'd suggest doing something like this:

List<ReferenceManager.RefreshListener> refreshListenerList = Arrays.asList(new RefreshMetricUpdater(refreshMetric));
if (this. checkpointRefreshListener != null) {
    refreshListenerList.add(checkpointRefreshListener);
}
final List<ReferenceManager.RefreshListener> internalRefreshListener = Collections.unmodifiableList(refreshListenerList);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kartg I tried to incorporate your code block, but it throws me UnsupportedOperation Exception when we are adding, "refreshListenerList.add(checkpointRefreshListener);". As we are using Arrays.asList(new RefreshMetricUpdater(refreshMetric)); we cannot use .add method. Instead we have to again convert this into actual List. More about this exception can be found here. As this is a nitpick I am just using the way how it was before to avoid complexity

}

return this.engineConfigFactory.newEngineConfig(
shardId,
threadPool,
Expand All @@ -3116,7 +3138,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, refreshPendingLocationListener),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.opensearch.index.shard.PrimaryReplicaSyncer;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.plugins.MapperPlugin;
Expand Down Expand Up @@ -278,6 +279,7 @@ protected void configure() {
bind(RetentionLeaseSyncAction.class).asEagerSingleton();
bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton();
bind(RetentionLeaseSyncer.class).asEagerSingleton();
bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton();
Rishikesh1159 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.node.Node;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.plugins.PluginsService;
Expand Down Expand Up @@ -838,6 +839,7 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada
@Override
public IndexShard createShard(
final ShardRouting shardRouting,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final PeerRecoveryTargetService recoveryTargetService,
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
final RepositoriesService repositoriesService,
Expand All @@ -852,7 +854,7 @@ public IndexShard createShard(
IndexService indexService = indexService(shardRouting.index());
assert indexService != null;
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFailedException;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
import org.opensearch.snapshots.SnapshotShardsService;
Expand Down Expand Up @@ -137,6 +138,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final Consumer<ShardId> globalCheckpointSyncer;
private final RetentionLeaseSyncer retentionLeaseSyncer;

private final SegmentReplicationCheckpointPublisher checkpointPublisher;

@Inject
public IndicesClusterStateService(
final Settings settings,
Expand All @@ -152,13 +155,15 @@ public IndicesClusterStateService(
final SnapshotShardsService snapshotShardsService,
final PrimaryReplicaSyncer primaryReplicaSyncer,
final GlobalCheckpointSyncAction globalCheckpointSyncAction,
final RetentionLeaseSyncer retentionLeaseSyncer
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher
) {
this(
settings,
indicesService,
clusterService,
threadPool,
checkpointPublisher,
recoveryTargetService,
shardStateAction,
nodeMappingRefreshAction,
Expand All @@ -178,6 +183,7 @@ public IndicesClusterStateService(
final AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService,
final ClusterService clusterService,
final ThreadPool threadPool,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final PeerRecoveryTargetService recoveryTargetService,
final ShardStateAction shardStateAction,
final NodeMappingRefreshAction nodeMappingRefreshAction,
Expand All @@ -190,6 +196,7 @@ public IndicesClusterStateService(
final RetentionLeaseSyncer retentionLeaseSyncer
) {
this.settings = settings;
this.checkpointPublisher = checkpointPublisher;
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService);
this.indicesService = indicesService;
this.clusterService = clusterService;
Expand Down Expand Up @@ -623,6 +630,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
indicesService.createShard(
shardRouting,
checkpointPublisher,
recoveryTargetService,
new RecoveryListener(shardRouting, primaryTerm),
repositoriesService,
Expand Down Expand Up @@ -988,6 +996,7 @@ U createIndex(IndexMetadata indexMetadata, List<IndexEventListener> builtInIndex
*/
T createShard(
ShardRouting shardRouting,
SegmentReplicationCheckpointPublisher checkpointPublisher,
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
Expand Down
Loading