Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <mariazrr@amazon.com>
  • Loading branch information
ruai0511 committed Aug 31, 2024
1 parent e1eae4f commit 12c2b51
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ public class CreateQueryGroupRequest extends ActionRequest {
* Constructor for CreateQueryGroupRequest
* @param queryGroup - A {@link QueryGroup} object
*/
public CreateQueryGroupRequest(QueryGroup queryGroup) {
CreateQueryGroupRequest(QueryGroup queryGroup) {
this.queryGroup = queryGroup;
}

/**
* Constructor for CreateQueryGroupRequest
* @param in - A {@link StreamInput} object
*/
public CreateQueryGroupRequest(StreamInput in) throws IOException {
CreateQueryGroupRequest(StreamInput in) throws IOException {
super(in);
queryGroup = new QueryGroup(in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.wlm.ChangeableQueryGroup;
import org.opensearch.wlm.ChangeableQueryGroup.ResiliencyMode;
import org.opensearch.wlm.ResourceType;
import org.opensearch.wlm.MutableQueryGroupFragment;

import java.io.IOException;
import java.util.Map;

/**
* A request for update QueryGroup
Expand All @@ -28,24 +25,24 @@
*/
public class UpdateQueryGroupRequest extends ActionRequest {
private final String name;
private final ChangeableQueryGroup changeableQueryGroup;
private final MutableQueryGroupFragment mutableQueryGroupFragment;

/**
* Constructor for UpdateQueryGroupRequest
* @param name - QueryGroup name for UpdateQueryGroupRequest
* @param changeableQueryGroup - ChangeableQueryGroup for UpdateQueryGroupRequest
* @param mutableQueryGroupFragment - MutableQueryGroupFragment for UpdateQueryGroupRequest
*/
public UpdateQueryGroupRequest(String name, ChangeableQueryGroup changeableQueryGroup) {
UpdateQueryGroupRequest(String name, MutableQueryGroupFragment mutableQueryGroupFragment) {
this.name = name;
this.changeableQueryGroup = changeableQueryGroup;
this.mutableQueryGroupFragment = mutableQueryGroupFragment;
}

/**
* Constructor for UpdateQueryGroupRequest
* @param in - A {@link StreamInput} object
*/
public UpdateQueryGroupRequest(StreamInput in) throws IOException {
this(in.readString(), new ChangeableQueryGroup(in));
UpdateQueryGroupRequest(StreamInput in) throws IOException {
this(in.readString(), new MutableQueryGroupFragment(in));
}

/**
Expand All @@ -55,7 +52,7 @@ public UpdateQueryGroupRequest(StreamInput in) throws IOException {
*/
public static UpdateQueryGroupRequest fromXContent(XContentParser parser, String name) throws IOException {
QueryGroup.Builder builder = QueryGroup.Builder.fromXContent(parser);
return new UpdateQueryGroupRequest(name, builder.getChangeableQueryGroup());
return new UpdateQueryGroupRequest(name, builder.getMutableQueryGroupFragment());
}

@Override
Expand All @@ -72,29 +69,15 @@ public String getName() {
}

/**
* resourceLimits getter
* mutableQueryGroupFragment getter
*/
public Map<ResourceType, Double> getResourceLimits() {
return getChangeableQueryGroup().getResourceLimits();
}

/**
* resiliencyMode getter
*/
public ResiliencyMode getResiliencyMode() {
return getChangeableQueryGroup().getResiliencyMode();
}

/**
* changeableQueryGroup getter
*/
public ChangeableQueryGroup getChangeableQueryGroup() {
return changeableQueryGroup;
public MutableQueryGroupFragment getmMutableQueryGroupFragment() {
return mutableQueryGroupFragment;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
changeableQueryGroup.writeTo(out);
mutableQueryGroupFragment.writeTo(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,17 @@
import org.opensearch.plugin.wlm.action.DeleteQueryGroupRequest;
import org.opensearch.plugin.wlm.action.UpdateQueryGroupRequest;
import org.opensearch.plugin.wlm.action.UpdateQueryGroupResponse;
import org.opensearch.wlm.ChangeableQueryGroup;
import org.opensearch.wlm.ChangeableQueryGroup.ResiliencyMode;
import org.opensearch.wlm.MutableQueryGroupFragment;
import org.opensearch.wlm.ResourceType;
import org.joda.time.Instant;

import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.QueryGroup.updateExistingQueryGroup;

/**
* This class defines the functions for QueryGroup persistence
*/
Expand Down Expand Up @@ -296,31 +295,22 @@ ClusterState updateQueryGroupInClusterState(UpdateQueryGroupRequest updateQueryG
final Metadata metadata = currentState.metadata();
final Map<String, QueryGroup> existingGroups = currentState.metadata().queryGroups();
String name = updateQueryGroupRequest.getName();
MutableQueryGroupFragment mutableQueryGroupFragment = updateQueryGroupRequest.getmMutableQueryGroupFragment();

final QueryGroup existingGroup = existingGroups.values()
.stream()
.filter(group -> group.getName().equals(name))
.findFirst()
.orElseThrow(() -> new ResourceNotFoundException("No QueryGroup exists with the provided name: " + name));

// build the QueryGroup with updated fields
final Map<ResourceType, Double> updatedResourceLimits = new HashMap<>(existingGroup.getResourceLimits());
if (updateQueryGroupRequest.getResourceLimits() != null && !updateQueryGroupRequest.getResourceLimits().isEmpty()) {
validateTotalUsage(existingGroups, name, updateQueryGroupRequest.getResourceLimits());
updatedResourceLimits.putAll(updateQueryGroupRequest.getResourceLimits());
}

final ResiliencyMode mode = Optional.ofNullable(updateQueryGroupRequest.getResiliencyMode())
.orElse(existingGroup.getResiliencyMode());

final QueryGroup updatedGroup = new QueryGroup(
name,
existingGroup.get_id(),
new ChangeableQueryGroup(mode, updatedResourceLimits),
Instant.now().getMillis()
);
validateTotalUsage(existingGroups, name, mutableQueryGroupFragment.getResourceLimits());
return ClusterState.builder(currentState)
.metadata(Metadata.builder(metadata).remove(existingGroup).put(updatedGroup).build())
.metadata(
Metadata.builder(metadata)
.remove(existingGroup)
.put(updateExistingQueryGroup(existingGroup, mutableQueryGroupFragment))
.build()
)
.build();
}

Expand All @@ -330,6 +320,9 @@ ClusterState updateQueryGroupInClusterState(UpdateQueryGroupRequest updateQueryG
* @param resourceLimits - the QueryGroup we're creating or updating
*/
private void validateTotalUsage(Map<String, QueryGroup> existingQueryGroups, String name, Map<ResourceType, Double> resourceLimits) {
if (resourceLimits == null || resourceLimits.isEmpty()) {
return;
}
final Map<ResourceType, Double> totalUsage = new EnumMap<>(ResourceType.class);
totalUsage.putAll(resourceLimits);
for (QueryGroup currGroup : existingQueryGroups.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.wlm.ChangeableQueryGroup;
import org.opensearch.wlm.MutableQueryGroupFragment;
import org.opensearch.wlm.ResourceType;

import java.util.ArrayList;
Expand All @@ -48,13 +48,17 @@ public class QueryGroupTestUtils {
public static final long TIMESTAMP_TWO = 4513232415L;
public static final QueryGroup queryGroupOne = builder().name(NAME_ONE)
._id(_ID_ONE)
.changeableQueryGroup(new ChangeableQueryGroup(ChangeableQueryGroup.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.3)))
.mutableQueryGroupFragment(
new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.3))
)
.updatedAt(TIMESTAMP_ONE)
.build();

public static final QueryGroup queryGroupTwo = builder().name(NAME_TWO)
._id(_ID_TWO)
.changeableQueryGroup(new ChangeableQueryGroup(ChangeableQueryGroup.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.6)))
.mutableQueryGroupFragment(
new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.6))
)
.updatedAt(TIMESTAMP_TWO)
.build();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm.action;

import org.opensearch.wlm.MutableQueryGroupFragment;

public class QueryGroupActionTestUtils {
public static UpdateQueryGroupRequest updateQueryGroupRequest(String name, MutableQueryGroupFragment mutableQueryGroupFragment) {
return new UpdateQueryGroupRequest(name, mutableQueryGroupFragment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.wlm.ChangeableQueryGroup;
import org.opensearch.wlm.ChangeableQueryGroup.ResiliencyMode;
import org.opensearch.wlm.MutableQueryGroupFragment;
import org.opensearch.wlm.MutableQueryGroupFragment.ResiliencyMode;
import org.opensearch.wlm.ResourceType;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.plugin.wlm.QueryGroupTestUtils.NAME_ONE;
import static org.opensearch.plugin.wlm.QueryGroupTestUtils.assertEqualResourceLimits;
import static org.opensearch.plugin.wlm.QueryGroupTestUtils.queryGroupOne;

public class UpdateQueryGroupRequestTests extends OpenSearchTestCase {
Expand All @@ -29,29 +28,26 @@ public class UpdateQueryGroupRequestTests extends OpenSearchTestCase {
* Test case to verify the serialization and deserialization of UpdateQueryGroupRequest.
*/
public void testSerialization() throws IOException {
UpdateQueryGroupRequest request = new UpdateQueryGroupRequest(NAME_ONE, queryGroupOne.getChangeableQueryGroup());
UpdateQueryGroupRequest request = new UpdateQueryGroupRequest(NAME_ONE, queryGroupOne.getMutableQueryGroupFragment());
BytesStreamOutput out = new BytesStreamOutput();
request.writeTo(out);
StreamInput streamInput = out.bytes().streamInput();
UpdateQueryGroupRequest otherRequest = new UpdateQueryGroupRequest(streamInput);
assertEquals(request.getName(), otherRequest.getName());
assertEquals(request.getResourceLimits().size(), otherRequest.getResourceLimits().size());
assertEquals(request.getResiliencyMode(), otherRequest.getResiliencyMode());
assertEqualResourceLimits(request.getResourceLimits(), otherRequest.getResourceLimits());
assertEquals(request.getmMutableQueryGroupFragment(), otherRequest.getmMutableQueryGroupFragment());
}

/**
* Test case to verify the serialization and deserialization of UpdateQueryGroupRequest with only name field.
*/
public void testSerializationOnlyName() throws IOException {
UpdateQueryGroupRequest request = new UpdateQueryGroupRequest(NAME_ONE, new ChangeableQueryGroup(null, new HashMap<>()));
UpdateQueryGroupRequest request = new UpdateQueryGroupRequest(NAME_ONE, new MutableQueryGroupFragment(null, new HashMap<>()));
BytesStreamOutput out = new BytesStreamOutput();
request.writeTo(out);
StreamInput streamInput = out.bytes().streamInput();
UpdateQueryGroupRequest otherRequest = new UpdateQueryGroupRequest(streamInput);
assertEquals(request.getName(), otherRequest.getName());
assertEquals(request.getResourceLimits(), otherRequest.getResourceLimits());
assertEquals(request.getResiliencyMode(), otherRequest.getResiliencyMode());
assertEquals(request.getmMutableQueryGroupFragment(), otherRequest.getmMutableQueryGroupFragment());
}

/**
Expand All @@ -60,16 +56,14 @@ public void testSerializationOnlyName() throws IOException {
public void testSerializationOnlyResourceLimit() throws IOException {
UpdateQueryGroupRequest request = new UpdateQueryGroupRequest(
NAME_ONE,
new ChangeableQueryGroup(null, Map.of(ResourceType.MEMORY, 0.4))
new MutableQueryGroupFragment(null, Map.of(ResourceType.MEMORY, 0.4))
);
BytesStreamOutput out = new BytesStreamOutput();
request.writeTo(out);
StreamInput streamInput = out.bytes().streamInput();
UpdateQueryGroupRequest otherRequest = new UpdateQueryGroupRequest(streamInput);
assertEquals(request.getName(), otherRequest.getName());
assertEquals(request.getResourceLimits().size(), otherRequest.getResourceLimits().size());
assertEqualResourceLimits(request.getResourceLimits(), otherRequest.getResourceLimits());
assertEquals(request.getResiliencyMode(), otherRequest.getResiliencyMode());
assertEquals(request.getmMutableQueryGroupFragment(), otherRequest.getmMutableQueryGroupFragment());
}

/**
Expand All @@ -80,7 +74,10 @@ public void testInvalidResourceLimitList() {
IllegalArgumentException.class,
() -> new UpdateQueryGroupRequest(
NAME_ONE,
new ChangeableQueryGroup(ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.3, ResourceType.fromName("random"), 0.4))
new MutableQueryGroupFragment(
ResiliencyMode.MONITOR,
Map.of(ResourceType.MEMORY, 0.3, ResourceType.fromName("random"), 0.4)
)
)
);
}
Expand All @@ -93,7 +90,7 @@ public void testInvalidEnforcement() {
IllegalArgumentException.class,
() -> new UpdateQueryGroupRequest(
NAME_ONE,
new ChangeableQueryGroup(ResiliencyMode.fromName("random"), Map.of(ResourceType.fromName("memory"), 0.3))
new MutableQueryGroupFragment(ResiliencyMode.fromName("random"), Map.of(ResourceType.fromName("memory"), 0.3))
)
);
}
Expand Down
Loading

0 comments on commit 12c2b51

Please sign in to comment.