Skip to content

Commit

Permalink
Add Update QueryGroup API Logic
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <mariazrr@amazon.com>
  • Loading branch information
ruai0511 committed Jul 16, 2024
1 parent 29a3e2c commit 7abcbc9
Show file tree
Hide file tree
Showing 22 changed files with 1,567 additions and 5 deletions.
18 changes: 18 additions & 0 deletions plugins/workload-management/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

opensearchplugin {
description 'OpenSearch Workload Management Plugin.'
classname 'org.opensearch.plugin.wlm.WorkloadManagementPlugin'
}

dependencies {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm;

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.plugin.wlm.service.Persistable;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

/**
* Transport action for update QueryGroup
*
* @opensearch.internal
*/
public class TransportUpdateQueryGroupAction extends HandledTransportAction<UpdateQueryGroupRequest, UpdateQueryGroupResponse> {

private final ThreadPool threadPool;
private final Persistable<QueryGroup> queryGroupPersistenceService;

/**
* Constructor for TransportUpdateQueryGroupAction
*
* @param actionName - action name
* @param transportService - a {@link TransportService} object
* @param actionFilters - a {@link ActionFilters} object
* @param threadPool - a {@link ThreadPool} object
* @param queryGroupPersistenceService - a {@link Persistable} object
*/
@Inject
public TransportUpdateQueryGroupAction(
String actionName,
TransportService transportService,
ActionFilters actionFilters,
ThreadPool threadPool,
Persistable<QueryGroup> queryGroupPersistenceService
) {
super(UpdateQueryGroupAction.NAME, transportService, actionFilters, UpdateQueryGroupRequest::new);
this.threadPool = threadPool;
this.queryGroupPersistenceService = queryGroupPersistenceService;
}

@Override
protected void doExecute(Task task, UpdateQueryGroupRequest request, ActionListener<UpdateQueryGroupResponse> listener) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> queryGroupPersistenceService.update(request, listener));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm;

import org.opensearch.action.ActionType;

/**
* Transport action to update QueryGroup
*
* @opensearch.api
*/
public class UpdateQueryGroupAction extends ActionType<UpdateQueryGroupResponse> {

/**
* An instance of UpdateQueryGroupAction
*/
public static final UpdateQueryGroupAction INSTANCE = new UpdateQueryGroupAction();

/**
* Name for UpdateQueryGroupAction
*/
public static final String NAME = "cluster:admin/opensearch/wlm/query_group/_update";

/**
* Default constructor
*/
private UpdateQueryGroupAction() {
super(NAME, UpdateQueryGroupResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.cluster.metadata.QueryGroup.ResiliencyMode;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.search.ResourceType;
import org.joda.time.Instant;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* A request for update QueryGroup
*
* @opensearch.internal
*/
public class UpdateQueryGroupRequest extends ActionRequest implements Writeable.Reader<UpdateQueryGroupRequest> {
String name;
Map<ResourceType, Object> resourceLimits;
private ResiliencyMode resiliencyMode;
long updatedAtInMillis;

/**
* Default constructor for UpdateQueryGroupRequest
*/
public UpdateQueryGroupRequest() {}

/**
* Constructor for UpdateQueryGroupRequest
* @param queryGroup - A {@link QueryGroup} object
*/
public UpdateQueryGroupRequest(QueryGroup queryGroup) {
this.name = queryGroup.getName();
this.resiliencyMode = queryGroup.getResiliencyMode();
this.resourceLimits = queryGroup.getResourceLimits();
this.updatedAtInMillis = queryGroup.getUpdatedAtInMillis();
}

/**
* Constructor for UpdateQueryGroupRequest
* @param name - QueryGroup name for UpdateQueryGroupRequest
* @param resiliencyMode - QueryGroup mode for UpdateQueryGroupRequest
* @param resourceLimits - QueryGroup resourceLimits for UpdateQueryGroupRequest
* @param updatedAtInMillis - QueryGroup updated time in millis for UpdateQueryGroupRequest
*/
public UpdateQueryGroupRequest(
String name,
ResiliencyMode resiliencyMode,
Map<ResourceType, Object> resourceLimits,
long updatedAtInMillis
) {
this.name = name;
this.resiliencyMode = resiliencyMode;
this.resourceLimits = resourceLimits;
this.updatedAtInMillis = updatedAtInMillis;
}

/**
* Constructor for UpdateQueryGroupRequest
* @param in - A {@link StreamInput} object
*/
public UpdateQueryGroupRequest(StreamInput in) throws IOException {
super(in);
name = in.readString();
if (in.readBoolean()) {
resourceLimits = in.readMap((i) -> ResourceType.fromName(i.readString()), StreamInput::readGenericValue);
} else {
resourceLimits = new HashMap<>();
}
if (in.readBoolean()) {
resiliencyMode = ResiliencyMode.fromName(in.readString());
}
updatedAtInMillis = in.readLong();
}

@Override
public UpdateQueryGroupRequest read(StreamInput in) throws IOException {
return new UpdateQueryGroupRequest(in);
}

/**
* Generate a UpdateQueryGroupRequest from XContent
* @param parser - A {@link XContentParser} object
* @param name - name of the QueryGroup to be updated
*/
public static UpdateQueryGroupRequest fromXContent(XContentParser parser, String name) throws IOException {
while (parser.currentToken() != XContentParser.Token.START_OBJECT) {
parser.nextToken();
}

if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("expected start object but got a " + parser.currentToken());
}

XContentParser.Token token;
String fieldName = "";
ResiliencyMode mode = null;

// Map to hold resources
final Map<ResourceType, Object> resourceLimits = new HashMap<>();
while ((token = parser.nextToken()) != null) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = parser.currentName();
} else if (token.isValue()) {
if (fieldName.equals("resiliency_mode")) {
mode = ResiliencyMode.fromName(parser.text());
} else {
throw new IllegalArgumentException("unrecognised [field=" + fieldName + " in QueryGroup");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (!fieldName.equals("resourceLimits")) {
throw new IllegalArgumentException(
"QueryGroup.resourceLimits is an object and expected token was { " + " but found " + token
);
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = parser.currentName();
} else {
resourceLimits.put(ResourceType.fromName(fieldName), parser.doubleValue());
}
}
}
}
return new UpdateQueryGroupRequest(name, mode, resourceLimits, Instant.now().getMillis());
}

@Override
public ActionRequestValidationException validate() {
return null;
}

/**
* name getter
*/
public String getName() {
return name;
}

/**
* name setter
* @param name - name to be set
*/
public void setName(String name) {
this.name = name;
}

/**
* ResourceLimits getter
*/
public Map<ResourceType, Object> getResourceLimits() {
return resourceLimits;
}

/**
* ResourceLimits setter
* @param resourceLimits - ResourceLimit to be set
*/
public void setResourceLimits(Map<ResourceType, Object> resourceLimits) {
this.resourceLimits = resourceLimits;
}

/**
* resiliencyMode getter
*/
public ResiliencyMode getResiliencyMode() {
return resiliencyMode;
}

/**
* resiliencyMode setter
* @param resiliencyMode - mode to be set
*/
public void setResiliencyMode(ResiliencyMode resiliencyMode) {
this.resiliencyMode = resiliencyMode;
}

/**
* updatedAtInMillis getter
*/
public long getUpdatedAtInMillis() {
return updatedAtInMillis;
}

/**
* updatedAtInMillis setter
* @param updatedAtInMillis - updatedAtInMillis to be set
*/
public void setUpdatedAtInMillis(long updatedAtInMillis) {
this.updatedAtInMillis = updatedAtInMillis;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
if (resourceLimits == null || resourceLimits.isEmpty()) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeMap(resourceLimits, ResourceType::writeTo, StreamOutput::writeGenericValue);
}
if (resiliencyMode == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeString(resiliencyMode.getName());
}
out.writeLong(updatedAtInMillis);
}
}
Loading

0 comments on commit 7abcbc9

Please sign in to comment.