Skip to content

Commit

Permalink
Extract SegmentReplicator class from SegmentReplicationTargetService.
Browse files Browse the repository at this point in the history
This change separates code that initiates replication from the target service component in prepartion
for implementing a task to initate replication events on an interval.

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
  • Loading branch information
mch2 committed Aug 29, 2024
1 parent 9014894 commit dfe17be
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
Expand All @@ -24,7 +22,6 @@
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
Expand All @@ -33,7 +30,6 @@
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.ForceSyncRequest;
Expand Down Expand Up @@ -61,7 +57,7 @@
import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT;

/**
* Service class that orchestrates replication events on replicas.
* Service class that handles incoming checkpoints to initiate replication events on replicas.
*
* @opensearch.internal
*/
Expand All @@ -72,17 +68,14 @@ public class SegmentReplicationTargetService extends AbstractLifecycleComponent
private final ThreadPool threadPool;
private final RecoverySettings recoverySettings;

private final ReplicationCollection<SegmentReplicationTarget> onGoingReplications;

private final Map<ShardId, SegmentReplicationState> completedReplications = ConcurrentCollections.newConcurrentMap();

private final SegmentReplicationSourceFactory sourceFactory;

protected final Map<ShardId, ReplicationCheckpoint> latestReceivedCheckpoint = ConcurrentCollections.newConcurrentMap();

private final IndicesService indicesService;
private final ClusterService clusterService;
private final TransportService transportService;
private final SegmentReplicator replicator;

/**
* The internal actions
Expand All @@ -94,6 +87,7 @@ public static class Actions {
public static final String FORCE_SYNC = "internal:index/shard/replication/segments_sync";
}

@Deprecated
public SegmentReplicationTargetService(
final ThreadPool threadPool,
final RecoverySettings recoverySettings,
Expand All @@ -113,6 +107,7 @@ public SegmentReplicationTargetService(
);
}

@Deprecated
public SegmentReplicationTargetService(
final ThreadPool threadPool,
final RecoverySettings recoverySettings,
Expand All @@ -121,14 +116,34 @@ public SegmentReplicationTargetService(
final IndicesService indicesService,
final ClusterService clusterService,
final ReplicationCollection<SegmentReplicationTarget> ongoingSegmentReplications
) {
this(
threadPool,
recoverySettings,
transportService,
sourceFactory,
indicesService,
clusterService,
new SegmentReplicator(threadPool)
);
}

public SegmentReplicationTargetService(
final ThreadPool threadPool,
final RecoverySettings recoverySettings,
final TransportService transportService,
final SegmentReplicationSourceFactory sourceFactory,
final IndicesService indicesService,
final ClusterService clusterService,
final SegmentReplicator replicator
) {
this.threadPool = threadPool;
this.recoverySettings = recoverySettings;
this.onGoingReplications = ongoingSegmentReplications;
this.sourceFactory = sourceFactory;
this.indicesService = indicesService;
this.clusterService = clusterService;
this.transportService = transportService;
this.replicator = replicator;

transportService.registerRequestHandler(
Actions.FILE_CHUNK,
Expand All @@ -154,7 +169,7 @@ protected void doStart() {
@Override
protected void doStop() {
if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
assert onGoingReplications.size() == 0 : "Replication collection should be empty on shutdown";
assert replicator.size() == 0 : "Replication collection should be empty on shutdown";
clusterService.removeListener(this);
}
}
Expand Down Expand Up @@ -199,7 +214,7 @@ public void clusterChanged(ClusterChangedEvent event) {
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
if (indexShard != null && indexShard.indexSettings().isSegRepEnabledOrRemoteNode()) {
onGoingReplications.cancelForShard(indexShard.shardId(), "Shard closing");
replicator.cancel(indexShard.shardId(), "Shard closing");
latestReceivedCheckpoint.remove(shardId);
}
}
Expand All @@ -224,7 +239,7 @@ public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting ol
&& indexShard.indexSettings().isSegRepEnabledOrRemoteNode()
&& oldRouting.primary() == false
&& newRouting.primary()) {
onGoingReplications.cancelForShard(indexShard.shardId(), "Shard has been promoted to primary");
replicator.cancel(indexShard.shardId(), "Shard has been promoted to primary");
latestReceivedCheckpoint.remove(indexShard.shardId());
}
}
Expand All @@ -234,17 +249,15 @@ public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting ol
*/
@Nullable
public SegmentReplicationState getOngoingEventSegmentReplicationState(ShardId shardId) {
return Optional.ofNullable(onGoingReplications.getOngoingReplicationTarget(shardId))
.map(SegmentReplicationTarget::state)
.orElse(null);
return Optional.ofNullable(replicator.get(shardId)).map(SegmentReplicationTarget::state).orElse(null);
}

/**
* returns SegmentReplicationState of latest completed segment replication events.
*/
@Nullable
public SegmentReplicationState getlatestCompletedEventSegmentReplicationState(ShardId shardId) {
return completedReplications.get(shardId);
return replicator.getCompleted(shardId);
}

/**
Expand All @@ -257,11 +270,11 @@ public SegmentReplicationState getSegmentReplicationState(ShardId shardId) {
}

public ReplicationRef<SegmentReplicationTarget> get(long replicationId) {
return onGoingReplications.get(replicationId);
return replicator.get(replicationId);
}

public SegmentReplicationTarget get(ShardId shardId) {
return onGoingReplications.getOngoingReplicationTarget(shardId);
return replicator.get(shardId);
}

/**
Expand All @@ -285,7 +298,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
// checkpoint to be replayed once the shard is Active.
if (replicaShard.state().equals(IndexShardState.STARTED) == true) {
// Checks if received checkpoint is already present and ahead then it replaces old received checkpoint
SegmentReplicationTarget ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(replicaShard.shardId());
SegmentReplicationTarget ongoingReplicationTarget = replicator.get(replicaShard.shardId());
if (ongoingReplicationTarget != null) {
if (ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) {
logger.debug(
Expand Down Expand Up @@ -504,28 +517,12 @@ public SegmentReplicationTarget startReplication(
final ReplicationCheckpoint checkpoint,
final SegmentReplicationListener listener
) {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
indexShard,
checkpoint,
sourceFactory.get(indexShard),
listener
);
startReplication(target);
return target;
return replicator.startReplication(indexShard, checkpoint, sourceFactory.get(indexShard), listener);
}

// pkg-private for integration tests
void startReplication(final SegmentReplicationTarget target) {
final long replicationId;
try {
replicationId = onGoingReplications.startSafe(target, recoverySettings.activityTimeout());
} catch (ReplicationFailedException e) {
// replication already running for shard.
target.fail(e, false);
return;
}
logger.trace(() -> new ParameterizedMessage("Added new replication to collection {}", target.description()));
threadPool.generic().execute(new ReplicationRunner(replicationId));
replicator.startReplication(target);
}

/**
Expand All @@ -550,89 +547,14 @@ default void onFailure(ReplicationState state, ReplicationFailedException e, boo
void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure);
}

/**
* Runnable implementation to trigger a replication event.
*/
private class ReplicationRunner extends AbstractRunnable {

final long replicationId;

public ReplicationRunner(long replicationId) {
this.replicationId = replicationId;
}

@Override
public void onFailure(Exception e) {
onGoingReplications.fail(replicationId, new ReplicationFailedException("Unexpected Error during replication", e), false);
}

@Override
public void doRun() {
start(replicationId);
}
}

private void start(final long replicationId) {
final SegmentReplicationTarget target;
try (ReplicationRef<SegmentReplicationTarget> replicationRef = onGoingReplications.get(replicationId)) {
// This check is for handling edge cases where the reference is removed before the ReplicationRunner is started by the
// threadpool.
if (replicationRef == null) {
return;
}
target = replicationRef.get();
}
target.startReplication(new ActionListener<>() {
@Override
public void onResponse(Void o) {
logger.debug(() -> new ParameterizedMessage("Finished replicating {} marking as done.", target.description()));
onGoingReplications.markAsDone(replicationId);
if (target.state().getIndex().recoveredFileCount() != 0 && target.state().getIndex().recoveredBytes() != 0) {
completedReplications.put(target.shardId(), target.state());
}
}

@Override
public void onFailure(Exception e) {
logger.debug("Replication failed {}", target.description());
if (isStoreCorrupt(target) || e instanceof CorruptIndexException || e instanceof OpenSearchCorruptionException) {
onGoingReplications.fail(replicationId, new ReplicationFailedException("Store corruption during replication", e), true);
return;
}
onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), false);
}
});
}

private boolean isStoreCorrupt(SegmentReplicationTarget target) {
// ensure target is not already closed. In that case
// we can assume the store is not corrupt and that the replication
// event completed successfully.
if (target.refCount() > 0) {
final Store store = target.store();
if (store.tryIncRef()) {
try {
return store.isMarkedCorrupted();
} catch (IOException ex) {
logger.warn("Unable to determine if store is corrupt", ex);
return false;
} finally {
store.decRef();
}
}
}
// store already closed.
return false;
}

private class FileChunkTransportRequestHandler implements TransportRequestHandler<FileChunkRequest> {

// How many bytes we've copied since we last called RateLimiter.pause
final AtomicLong bytesSinceLastPause = new AtomicLong();

@Override
public void messageReceived(final FileChunkRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<SegmentReplicationTarget> ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<SegmentReplicationTarget> ref = replicator.get(request.recoveryId(), request.shardId())) {
final SegmentReplicationTarget target = ref.get();
final ActionListener<Void> listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.replicationRateLimiter(), listener);
Expand Down
Loading

0 comments on commit dfe17be

Please sign in to comment.