Skip to content

Commit

Permalink
Implement patch API for datasources (#2273)
Browse files Browse the repository at this point in the history
* Implement patch API for datasources

Signed-off-by: Derek Ho <dxho@amazon.com>

* Change patch implementation to Map

Signed-off-by: Derek Ho <dxho@amazon.com>

* Fix up, everything complete except unit test

Signed-off-by: Derek Ho <dxho@amazon.com>

* Revise PR to use existing functions

Signed-off-by: Derek Ho <dxho@amazon.com>

* Remove unused utility function

Signed-off-by: Derek Ho <dxho@amazon.com>

* Add tests

Signed-off-by: Derek Ho <dxho@amazon.com>

* Add back line

Signed-off-by: Derek Ho <dxho@amazon.com>

* fix build issue

Signed-off-by: Derek Ho <dxho@amazon.com>

* Fix tests and add in rst

Signed-off-by: Derek Ho <dxho@amazon.com>

* Register patch

Signed-off-by: Derek Ho <dxho@amazon.com>

* Add imports

Signed-off-by: Derek Ho <dxho@amazon.com>

* Patch

Signed-off-by: Derek Ho <dxho@amazon.com>

* Fix integration test

Signed-off-by: Derek Ho <dxho@amazon.com>

* Update IT

Signed-off-by: Derek Ho <dxho@amazon.com>

* Add tests

Signed-off-by: Derek Ho <dxho@amazon.com>

* Fix test

Signed-off-by: Derek Ho <dxho@amazon.com>

* Fix tests and increase code cov

Signed-off-by: Derek Ho <dxho@amazon.com>

* Add more coverage to impl

Signed-off-by: Derek Ho <dxho@amazon.com>

* Fix test and jacoco passing

Signed-off-by: Derek Ho <dxho@amazon.com>

* Test fix

Signed-off-by: Derek Ho <dxho@amazon.com>

* Add docs

Signed-off-by: Derek Ho <dxho@amazon.com>

---------

Signed-off-by: Derek Ho <dxho@amazon.com>
  • Loading branch information
derek-ho authored Oct 18, 2023
1 parent 8f5e01d commit f835112
Show file tree
Hide file tree
Showing 15 changed files with 580 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.datasource;

import java.util.Map;
import java.util.Set;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
Expand Down Expand Up @@ -56,12 +57,19 @@ public interface DataSourceService {
void createDataSource(DataSourceMetadata metadata);

/**
* Updates {@link DataSource} corresponding to dataSourceMetadata.
* Updates {@link DataSource} corresponding to dataSourceMetadata (all fields needed).
*
* @param dataSourceMetadata {@link DataSourceMetadata}.
*/
void updateDataSource(DataSourceMetadata dataSourceMetadata);

/**
* Patches {@link DataSource} corresponding to the given name (only fields to be changed needed).
*
* @param dataSourceData
*/
void patchDataSource(Map<String, Object> dataSourceData);

/**
* Deletes {@link DataSource} corresponding to the DataSource name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ public DataSource getDataSource(String dataSourceName) {
@Override
public void updateDataSource(DataSourceMetadata dataSourceMetadata) {}

@Override
public void patchDataSource(Map<String, Object> dataSourceData) {}

@Override
public void deleteDataSource(String dataSourceName) {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.datasources.model.transport;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.CONNECTOR_FIELD;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD;

import java.io.IOException;
import java.util.Map;
import lombok.Getter;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.core.common.io.stream.StreamInput;

public class PatchDataSourceActionRequest extends ActionRequest {

@Getter private Map<String, Object> dataSourceData;

/** Constructor of UpdateDataSourceActionRequest from StreamInput. */
public PatchDataSourceActionRequest(StreamInput in) throws IOException {
super(in);
}

public PatchDataSourceActionRequest(Map<String, Object> dataSourceData) {
this.dataSourceData = dataSourceData;
}

@Override
public ActionRequestValidationException validate() {
if (this.dataSourceData.get(NAME_FIELD).equals(DEFAULT_DATASOURCE_NAME)) {
ActionRequestValidationException exception = new ActionRequestValidationException();
exception.addValidationError(
"Not allowed to update datasource with name : " + DEFAULT_DATASOURCE_NAME);
return exception;
} else if (this.dataSourceData.get(CONNECTOR_FIELD) != null) {
ActionRequestValidationException exception = new ActionRequestValidationException();
exception.addValidationError("Not allowed to update connector for datasource");
return exception;
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.datasources.model.transport;

import java.io.IOException;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

@RequiredArgsConstructor
public class PatchDataSourceActionResponse extends ActionResponse {

@Getter private final String result;

public PatchDataSourceActionResponse(StreamInput in) throws IOException {
super(in);
result = in.readString();
}

@Override
public void writeTo(StreamOutput streamOutput) throws IOException {
streamOutput.writeString(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@
import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.NOT_FOUND;
import static org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE;
import static org.opensearch.rest.RestRequest.Method.DELETE;
import static org.opensearch.rest.RestRequest.Method.GET;
import static org.opensearch.rest.RestRequest.Method.POST;
import static org.opensearch.rest.RestRequest.Method.PUT;
import static org.opensearch.rest.RestRequest.Method.*;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
Expand All @@ -32,18 +30,8 @@
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.datasources.exceptions.ErrorMessage;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse;
import org.opensearch.sql.datasources.transport.TransportCreateDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportDeleteDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportGetDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportUpdateDataSourceAction;
import org.opensearch.sql.datasources.model.transport.*;
import org.opensearch.sql.datasources.transport.*;
import org.opensearch.sql.datasources.utils.Scheduler;
import org.opensearch.sql.datasources.utils.XContentParserUtils;

Expand Down Expand Up @@ -98,6 +86,17 @@ public List<Route> routes() {
*/
new Route(PUT, BASE_DATASOURCE_ACTION_URL),

/*
* PATCH datasources
* Request body:
* Ref
* [org.opensearch.sql.plugin.transport.datasource.model.PatchDataSourceActionRequest]
* Response body:
* Ref
* [org.opensearch.sql.plugin.transport.datasource.model.PatchDataSourceActionResponse]
*/
new Route(PATCH, BASE_DATASOURCE_ACTION_URL),

/*
* DELETE datasources
* Request body: Ref
Expand All @@ -122,6 +121,8 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
return executeUpdateRequest(restRequest, nodeClient);
case DELETE:
return executeDeleteRequest(restRequest, nodeClient);
case PATCH:
return executePatchRequest(restRequest, nodeClient);
default:
return restChannel ->
restChannel.sendResponse(
Expand Down Expand Up @@ -216,6 +217,34 @@ public void onFailure(Exception e) {
}));
}

private RestChannelConsumer executePatchRequest(RestRequest restRequest, NodeClient nodeClient)
throws IOException {
Map<String, Object> dataSourceData = XContentParserUtils.toMap(restRequest.contentParser());
return restChannel ->
Scheduler.schedule(
nodeClient,
() ->
nodeClient.execute(
TransportPatchDataSourceAction.ACTION_TYPE,
new PatchDataSourceActionRequest(dataSourceData),
new ActionListener<>() {
@Override
public void onResponse(
PatchDataSourceActionResponse patchDataSourceActionResponse) {
restChannel.sendResponse(
new BytesRestResponse(
RestStatus.OK,
"application/json; charset=UTF-8",
patchDataSourceActionResponse.getResult()));
}

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
}
}));
}

private RestChannelConsumer executeDeleteRequest(RestRequest restRequest, NodeClient nodeClient) {

String dataSourceName = restRequest.param("dataSourceName");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,11 @@
package org.opensearch.sql.datasources.service;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.*;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
Expand Down Expand Up @@ -100,6 +96,19 @@ public void updateDataSource(DataSourceMetadata dataSourceMetadata) {
}
}

@Override
public void patchDataSource(Map<String, Object> dataSourceData) {
if (!dataSourceData.get(NAME_FIELD).equals(DEFAULT_DATASOURCE_NAME)) {
DataSourceMetadata dataSourceMetadata =
getRawDataSourceMetadata((String) dataSourceData.get(NAME_FIELD));
replaceOldDatasourceMetadata(dataSourceData, dataSourceMetadata);
updateDataSource(dataSourceMetadata);
} else {
throw new UnsupportedOperationException(
"Not allowed to update default datasource :" + DEFAULT_DATASOURCE_NAME);
}
}

@Override
public void deleteDataSource(String dataSourceName) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
Expand Down Expand Up @@ -136,6 +145,35 @@ private void validateDataSourceMetaData(DataSourceMetadata metadata) {
+ " Properties are required parameters.");
}

/**
* Replaces the fields in the map of the given metadata.
*
* @param dataSourceData
* @param metadata {@link DataSourceMetadata}.
*/
private void replaceOldDatasourceMetadata(
Map<String, Object> dataSourceData, DataSourceMetadata metadata) {

for (String key : dataSourceData.keySet()) {
switch (key) {
// Name and connector should not be modified
case DESCRIPTION_FIELD:
metadata.setDescription((String) dataSourceData.get(DESCRIPTION_FIELD));
break;
case ALLOWED_ROLES_FIELD:
metadata.setAllowedRoles((List<String>) dataSourceData.get(ALLOWED_ROLES_FIELD));
break;
case PROPERTIES_FIELD:
Map<String, String> properties = new HashMap<>(metadata.getProperties());
properties.putAll(((Map<String, String>) dataSourceData.get(PROPERTIES_FIELD)));
break;
case NAME_FIELD:
case CONNECTOR_FIELD:
break;
}
}
}

@Override
public DataSourceMetadata getRawDataSourceMetadata(String dataSourceName) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.datasources.transport;

import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD;
import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

public class TransportPatchDataSourceAction
extends HandledTransportAction<PatchDataSourceActionRequest, PatchDataSourceActionResponse> {

public static final String NAME = "cluster:admin/opensearch/ql/datasources/patch";
public static final ActionType<PatchDataSourceActionResponse> ACTION_TYPE =
new ActionType<>(NAME, PatchDataSourceActionResponse::new);

private DataSourceService dataSourceService;

/**
* TransportPatchDataSourceAction action for updating datasource.
*
* @param transportService transportService.
* @param actionFilters actionFilters.
* @param dataSourceService dataSourceService.
*/
@Inject
public TransportPatchDataSourceAction(
TransportService transportService,
ActionFilters actionFilters,
DataSourceServiceImpl dataSourceService) {
super(
TransportPatchDataSourceAction.NAME,
transportService,
actionFilters,
PatchDataSourceActionRequest::new);
this.dataSourceService = dataSourceService;
}

@Override
protected void doExecute(
Task task,
PatchDataSourceActionRequest request,
ActionListener<PatchDataSourceActionResponse> actionListener) {
try {
dataSourceService.patchDataSource(request.getDataSourceData());
String responseContent =
new JsonResponseFormatter<String>(PRETTY) {
@Override
protected Object buildJsonObject(String response) {
return response;
}
}.format("Updated DataSource with name " + request.getDataSourceData().get(NAME_FIELD));
actionListener.onResponse(new PatchDataSourceActionResponse(responseContent));
} catch (Exception e) {
actionListener.onFailure(e);
}
}
}
Loading

0 comments on commit f835112

Please sign in to comment.