Skip to content

Commit

Permalink
Merge branch 'main' into cs-query-profile-stats
Browse files Browse the repository at this point in the history
Signed-off-by: Ticheng Lin <51488860+ticheng-aws@users.noreply.github.com>
  • Loading branch information
ticheng-aws authored Jun 8, 2023
2 parents 5eedf6e + 1803fd9 commit e5ba28c
Show file tree
Hide file tree
Showing 51 changed files with 2,031 additions and 190 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))
- Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653))

### Dependencies
- Bump `jackson` from 2.15.1 to 2.15.2 ([#7897](https://github.com/opensearch-project/OpenSearch/pull/7897))
Expand All @@ -100,14 +101,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Replace jboss-annotations-api_1.2_spec with jakarta.annotation-api ([#7836](https://github.com/opensearch-project/OpenSearch/pull/7836))
- Add min, max, average and thread info to resource stats in tasks API ([#7673](https://github.com/opensearch-project/OpenSearch/pull/7673))
- Compress and cache cluster state during validate join request ([#7321](https://github.com/opensearch-project/OpenSearch/pull/7321))
- [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. ([#7118](https://github.com/opensearch-project/OpenSearch/pull/7118))
- Add new query profile collector fields with concurrent search execution ([#7898](https://github.com/opensearch-project/OpenSearch/pull/7898))

### Deprecated

### Removed

### Fixed
- Fixing error: adding a new/forgotten parameter to the configuration for checking the config on startup in plugins/repository-s3 #7924
- Fixing error: adding a new/forgotten parameter to the configuration for checking the config on startup in plugins/repository-s3 #7924

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,8 @@ public ClusterState.Custom randomCreate(String name) {
Map.of(),
null,
SnapshotInfoTests.randomUserMetadata(),
randomVersion(random())
randomVersion(random()),
false
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.lease.Releasable;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.NRTReplicationReaderManager;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
Expand Down Expand Up @@ -189,7 +192,14 @@ public void testCancelPrimaryAllocation() throws Exception {
public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME);
final Settings settings = Settings.builder()
.put(indexSettings())
.put(
EngineConfig.INDEX_CODEC_SETTING.getKey(),
randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC, CodecService.LUCENE_DEFAULT_CODEC)
)
.build();
createIndex(INDEX_NAME, settings);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
Expand Down Expand Up @@ -580,6 +590,7 @@ public void testDeleteOperations() throws Exception {
* from xlog.
*/
public void testReplicationPostDeleteAndForceMerge() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
final String replica = internalCluster().startNode();
Expand Down Expand Up @@ -785,6 +796,10 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
}

public void testPressureServiceStats() throws Exception {
assumeFalse(
"Skipping the test as pressure service is not compatible with SegRep and Remote store yet.",
segmentReplicationWithRemoteEnabled()
);
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
Expand Down Expand Up @@ -874,6 +889,7 @@ public void testPressureServiceStats() throws Exception {
* @throws Exception when issue is encountered
*/
public void testScrollCreatedOnReplica() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -963,6 +979,11 @@ public void testScrollCreatedOnReplica() throws Exception {
* @throws Exception when issue is encountered
*/
public void testScrollWithOngoingSegmentReplication() throws Exception {
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store yet.",
segmentReplicationWithRemoteEnabled()
);

// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startNode();
prepareCreate(
Expand Down Expand Up @@ -1249,4 +1270,8 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception {
waitForSearchableDocs(2, nodes);
}

private boolean segmentReplicationWithRemoteEnabled() {
return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings()).booleanValue()
&& "true".equalsIgnoreCase(featureFlagSettings().get(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationRemoteStoreIT extends SegmentReplicationIT {

private static final String REPOSITORY_NAME = "test-remore-store-repo";
private static final String REPOSITORY_NAME = "test-remote-store-repo";

@Override
public Settings indexSettings() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.SegmentReplicationIT;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* This class runs Segment Replication Integ test suite with remote store enabled.
* Setup is similar to SegmentReplicationRemoteStoreIT but this also enables the segment replication using remote store which
* is behind SEGMENT_REPLICATION_EXPERIMENTAL flag. After this is moved out of experimental, we can combine and keep only one
* test suite for Segment and Remote store integration tests.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationUsingRemoteStoreIT extends SegmentReplicationIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public static Entry startedEntry(
long repositoryStateId,
final Map<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata,
Version version
Version version,
boolean remoteStoreIndexShallowCopy
) {
return new SnapshotsInProgress.Entry(
snapshot,
Expand All @@ -127,7 +128,8 @@ public static Entry startedEntry(
shards,
null,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
}

Expand Down Expand Up @@ -164,7 +166,8 @@ public static Entry startClone(
Collections.emptyMap(),
version,
source,
Map.of()
Map.of(),
false // TODO: need to pull this value from the original snapshot, use whatever we set during snapshot create.
);
}

Expand All @@ -177,6 +180,7 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation
private final State state;
private final Snapshot snapshot;
private final boolean includeGlobalState;
private final boolean remoteStoreIndexShallowCopy;
private final boolean partial;
/**
* Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation.
Expand Down Expand Up @@ -219,7 +223,8 @@ public Entry(
final Map<ShardId, ShardSnapshotStatus> shards,
String failure,
Map<String, Object> userMetadata,
Version version
Version version,
boolean remoteStoreIndexShallowCopy
) {
this(
snapshot,
Expand All @@ -235,7 +240,8 @@ public Entry(
userMetadata,
version,
null,
Map.of()
Map.of(),
remoteStoreIndexShallowCopy
);
}

Expand All @@ -253,7 +259,8 @@ private Entry(
final Map<String, Object> userMetadata,
Version version,
@Nullable SnapshotId source,
@Nullable final Map<RepositoryShardId, ShardSnapshotStatus> clones
@Nullable final Map<RepositoryShardId, ShardSnapshotStatus> clones,
boolean remoteStoreIndexShallowCopy
) {
this.state = state;
this.snapshot = snapshot;
Expand All @@ -274,6 +281,7 @@ private Entry(
} else {
this.clones = Collections.unmodifiableMap(clones);
}
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones);
}

Expand All @@ -292,6 +300,11 @@ private Entry(StreamInput in) throws IOException {
dataStreams = in.readStringList();
source = in.readOptionalWriteable(SnapshotId::new);
clones = in.readMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
remoteStoreIndexShallowCopy = in.readBoolean();
} else {
remoteStoreIndexShallowCopy = false;
}
}

private static boolean assertShardsConsistent(
Expand Down Expand Up @@ -346,7 +359,8 @@ public Entry(
long repositoryStateId,
final Map<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata,
Version version
Version version,
boolean remoteStoreIndexShallowCopy
) {
this(
snapshot,
Expand All @@ -360,7 +374,8 @@ public Entry(
shards,
null,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
}

Expand All @@ -385,7 +400,8 @@ public Entry(
shards,
failure,
entry.userMetadata,
version
version,
entry.remoteStoreIndexShallowCopy
);
}

Expand All @@ -409,7 +425,8 @@ public Entry withRepoGen(long newRepoGen) {
userMetadata,
version,
source,
clones
clones,
remoteStoreIndexShallowCopy
);
}

Expand All @@ -431,7 +448,8 @@ public Entry withClones(final Map<RepositoryShardId, ShardSnapshotStatus> update
userMetadata,
version,
source,
updatedClones
updatedClones,
remoteStoreIndexShallowCopy
);
}

Expand Down Expand Up @@ -486,7 +504,8 @@ public Entry fail(final Map<ShardId, ShardSnapshotStatus> shards, State state, S
userMetadata,
version,
source,
clones
clones,
remoteStoreIndexShallowCopy
);
}

Expand All @@ -512,7 +531,8 @@ public Entry withShardStates(final Map<ShardId, ShardSnapshotStatus> shards) {
shards,
failure,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
}
return withStartedShards(shards);
Expand All @@ -535,7 +555,8 @@ public Entry withStartedShards(final Map<ShardId, ShardSnapshotStatus> shards) {
shards,
failure,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
assert updated.state().completed() == false && completed(updated.shards().values()) == false
: "Only running snapshots allowed but saw [" + updated + "]";
Expand Down Expand Up @@ -567,6 +588,10 @@ public boolean includeGlobalState() {
return includeGlobalState;
}

public boolean remoteStoreIndexShallowCopy() {
return remoteStoreIndexShallowCopy;
}

public Map<String, Object> userMetadata() {
return userMetadata;
}
Expand Down Expand Up @@ -630,7 +655,7 @@ public boolean equals(Object o) {
if (version.equals(entry.version) == false) return false;
if (Objects.equals(source, ((Entry) o).source) == false) return false;
if (clones.equals(((Entry) o).clones) == false) return false;

if (remoteStoreIndexShallowCopy != entry.remoteStoreIndexShallowCopy) return false;
return true;
}

Expand All @@ -647,6 +672,7 @@ public int hashCode() {
result = 31 * result + version.hashCode();
result = 31 * result + (source == null ? 0 : source.hashCode());
result = 31 * result + clones.hashCode();
result = 31 * result + (remoteStoreIndexShallowCopy ? 1 : 0);
return result;
}

Expand Down Expand Up @@ -710,6 +736,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeStringCollection(dataStreams);
out.writeOptionalWriteable(source);
out.writeMap(clones, (o, v) -> v.writeTo(o), (o, v) -> v.writeTo(o));
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(remoteStoreIndexShallowCopy);
}
}

@Override
Expand Down
Loading

0 comments on commit e5ba28c

Please sign in to comment.