Skip to content

Commit

Permalink
Fixing tests
Browse files Browse the repository at this point in the history
Signed-off-by: Dharmesh 💤 <sdharms@amazon.com>
  • Loading branch information
psychbot authored and Sachin Kale committed Aug 31, 2023
1 parent 2956dd1 commit 496c00d
Show file tree
Hide file tree
Showing 10 changed files with 570 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,36 @@ public RemoteStoreNode(DiscoveryNode node) {
private String validateAttributeNonNull(String attributeKey) {
String attributeValue = node.getAttributes().get(attributeKey);
if (attributeValue == null || attributeValue.isEmpty()) {
throw new IllegalStateException("joining node [" + this + "] doesn't have the node attribute [" + attributeKey + "].");
throw new IllegalStateException("joining node [" + this.node + "] doesn't have the node attribute [" + attributeKey + "]");
}

return attributeValue;
}

private Map<String, String> validateSettingsAttributesNonNull(String settingsAttributeKeyPrefix) {
return node.getAttributes()
private Map<String, String> validateSettingsAttributesNonNull(String repositoryName) {
String settingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
repositoryName
);
Map<String, String> settingsMap = node.getAttributes()
.keySet()
.stream()
.filter(key -> key.startsWith(settingsAttributeKeyPrefix))
.collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> validateAttributeNonNull(key)));

if (settingsMap.isEmpty()) {
throw new IllegalStateException(
"joining node [" + this.node + "] doesn't have settings attribute for [" + repositoryName + "] repository"
);
}

return settingsMap;
}

private RepositoryMetadata buildRepositoryMetadata(String name) {
String type = validateAttributeNonNull(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name));
Map<String, String> settingsMap = validateSettingsAttributesNonNull(
String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name)
);
Map<String, String> settingsMap = validateSettingsAttributesNonNull(name);

Settings.Builder settings = Settings.builder();
settingsMap.forEach(settings::put);
Expand All @@ -74,8 +85,8 @@ private RepositoryMetadata buildRepositoryMetadata(String name) {
}

private RepositoriesMetadata buildRepositoriesMetadata() {
String segmentRepositoryName = node.getAttributes().get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
String translogRepositoryName = node.getAttributes().get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
String segmentRepositoryName = validateAttributeNonNull(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
String translogRepositoryName = validateAttributeNonNull(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
if (segmentRepositoryName.equals(translogRepositoryName)) {
return new RepositoriesMetadata(Collections.singletonList(buildRepositoryMetadata(segmentRepositoryName)));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public Coordinator(
allocationService,
clusterManagerService,
transportService,
remoteStoreService,
this::getCurrentTerm,
this::getStateForClusterManagerService,
this::handleJoinRequest,
Expand All @@ -226,8 +227,7 @@ public Coordinator(
rerouteService,
nodeHealthService,
this::onNodeCommissionStatusChange,
namedWriteableRegistry,
remoteStoreService
namedWriteableRegistry
);
this.persistedStateSupplier = persistedStateSupplier;
this.noClusterManagerBlockService = new NoClusterManagerBlockService(settings, clusterSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public class JoinHelper {
AllocationService allocationService,
ClusterManagerService clusterManagerService,
TransportService transportService,
RemoteStoreService remoteStoreService,
LongSupplier currentTermSupplier,
Supplier<ClusterState> currentStateSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler,
Expand All @@ -144,8 +145,7 @@ public class JoinHelper {
RerouteService rerouteService,
NodeHealthService nodeHealthService,
Consumer<Boolean> nodeCommissioned,
NamedWriteableRegistry namedWriteableRegistry,
RemoteStoreService remoteStoreService
NamedWriteableRegistry namedWriteableRegistry
) {
this.clusterManagerService = clusterManagerService;
this.transportService = transportService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,8 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata)
* A remote store node is the one which holds all the remote store attributes and a remote store cluster is
* the one which has only homogeneous remote store nodes with same node attributes
*
* TODO: When we support migration from remote store cluster to non remote store and vice versa the migration
* setting {@link RemoteStoreService::REMOTE_STORE_MIGRATION_SETTING} will be help determine if a non
* remote store node is allowed to join the remote store cluster and vice versa.
* TODO: When we support moving from remote store cluster to non remote store and vice versa the this logic will
* needs to be modified.
*/
public static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, ClusterState currentState) {
List<DiscoveryNode> existingNodes = new ArrayList<>(currentState.getNodes().getNodes().values());
Expand All @@ -487,18 +486,18 @@ public static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in "
+ "comparison with existing node ["
+ existingNode
+ "]."
+ "]"
);
}
} else {
throw new IllegalStateException(
"a remote store node [" + joiningNode + "] is trying to join a non remote store cluster."
"a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"
);
}
} else {
if (existingNode.isRemoteStoreNode()) {
throw new IllegalStateException(
"a non remote store node [" + joiningNode + "] is trying to join a remote store cluster."
"a non remote store node [" + joiningNode + "] is trying to join a remote store cluster"
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,27 @@
import org.apache.logging.log4j.Level;
import org.opensearch.Version;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.admin.cluster.remotestore.RemoteStoreService;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SetOnce;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.test.transport.CapturingTransport.CapturedRequest;
import org.opensearch.test.transport.MockTransport;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.BytesTransportRequest;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.TransportException;
Expand All @@ -73,6 +78,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.mock;

public class JoinHelperTests extends OpenSearchTestCase {
private final NamedWriteableRegistry namedWriteableRegistry = DEFAULT_NAMED_WRITABLE_REGISTRY;
Expand All @@ -97,6 +103,7 @@ public void testJoinDeduplication() {
null,
null,
transportService,
buildRemoteStoreService(transportService, deterministicTaskQueue.getThreadPool()),
() -> 0L,
() -> null,
(joinRequest, joinCallback) -> {
Expand Down Expand Up @@ -282,6 +289,7 @@ public void testJoinFailureOnUnhealthyNodes() {
null,
null,
transportService,
buildRemoteStoreService(transportService, deterministicTaskQueue.getThreadPool()),
() -> 0L,
() -> null,
(joinRequest, joinCallback) -> {
Expand Down Expand Up @@ -481,6 +489,7 @@ private TestClusterSetup getTestClusterSetup(Version version, boolean isCapturin
null,
null,
transportService,
buildRemoteStoreService(transportService, deterministicTaskQueue.getThreadPool()),
() -> 0L,
() -> localClusterState,
(joinRequest, joinCallback) -> {
Expand All @@ -500,6 +509,18 @@ private TestClusterSetup getTestClusterSetup(Version version, boolean isCapturin
return new TestClusterSetup(deterministicTaskQueue, localNode, transportService, localClusterState, joinHelper, capturingTransport);
}

private RemoteStoreService buildRemoteStoreService(TransportService transportService, ThreadPool threadPool) {
RepositoriesService repositoriesService = new RepositoriesService(
Settings.EMPTY,
mock(ClusterService.class),
transportService,
Collections.emptyMap(),
Collections.emptyMap(),
threadPool
);
return new RemoteStoreService(new SetOnce<>(repositoriesService)::get, threadPool);
}

private static class TestClusterSetup {
public final DeterministicTaskQueue deterministicTaskQueue;
public final DiscoveryNode localNode;
Expand Down
Loading

0 comments on commit 496c00d

Please sign in to comment.