Skip to content

Commit

Permalink
merge from 2.x
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo committed Oct 30, 2023
1 parent 6e8d9dc commit 34a8917
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 353 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService(
new FlintIndexMetadataReaderImpl(client),
client,
new SessionManager(stateStore, emrServerlessClient, pluginSettings),
new DefaultLeaseManager(pluginSettings, stateStore));
new DefaultLeaseManager(pluginSettings, stateStore),
stateStore);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService,
sparkQueryDispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,6 @@ public DispatchQueryResponse submit(
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
return new DispatchQueryResponse(
context.getQueryId(), jobId, false, dataSourceMetadata.getResultIndex(), null);
context.getQueryId(), jobId, dataSourceMetadata.getResultIndex(), null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@

package org.opensearch.sql.spark.dispatcher;

import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult;

import com.amazonaws.services.emrserverless.model.JobRunState;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -21,6 +19,7 @@
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;
Expand Down Expand Up @@ -58,11 +57,11 @@ public static boolean isIndexDMLQuery(String jobId) {
return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId);
}

public DispatchQueryResponse handle(
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexDetails) {
DataSourceMetadata dataSourceMetadata =
dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
IndexQueryDetails indexDetails = context.getIndexQueryDetails();
FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails);
// if index is created without auto refresh. there is no job to cancel.
String status = JobRunState.FAILED.toString();
Expand Down Expand Up @@ -92,9 +91,7 @@ public DispatchQueryResponse handle(
dispatchQueryRequest.getDatasource(),
System.currentTimeMillis() - startTime,
System.currentTimeMillis());
String resultIndex =
Optional.ofNullable(dataSourceMetadata.getResultIndex())
.orElse(SPARK_RESPONSE_BUFFER_INDEX_NAME);
String resultIndex = dataSourceMetadata.getResultIndex();
createIndexDMLResult(stateStore, resultIndex).apply(indexDMLResult);

return new DispatchQueryResponse(asyncQueryId, DROP_INDEX_JOB_ID, resultIndex, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ public DispatchQueryResponse submit(
return new DispatchQueryResponse(
context.getQueryId(),
session.getSessionModel().getJobId(),
false,
dataSourceMetadata.getResultIndex(),
session.getSessionId().getSessionId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,12 @@

package org.opensearch.sql.spark.dispatcher;

import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SESSION_CLASS_NAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;

import com.amazonaws.services.emrserverless.model.JobRunState;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
Expand All @@ -40,7 +25,6 @@
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
Expand Down Expand Up @@ -99,8 +83,7 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest)
contextBuilder.indexQueryDetails(indexQueryDetails);

if (IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType())) {
// todo, fix in DROP INDEX PR.
return handleDropIndexQuery(dispatchQueryRequest, indexQueryDetails);
asyncQueryHandler = createIndexDMLHandler();
} else if (IndexQueryActionType.CREATE.equals(indexQueryDetails.getIndexQueryActionType())
&& indexQueryDetails.isAutoRefresh()) {
asyncQueryHandler =
Expand Down Expand Up @@ -128,9 +111,8 @@ public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata)
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
AsyncQueryHandler queryHandler;
if (asyncQueryJobMetadata.getSessionId() != null) {
return new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager)
.cancelJob(asyncQueryJobMetadata);
queryHandler = new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader);
queryHandler =
new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager);
} else if (IndexDMLHandler.isIndexDMLQuery(asyncQueryJobMetadata.getJobId())) {
queryHandler = createIndexDMLHandler();
} else {
Expand Down Expand Up @@ -164,40 +146,6 @@ private static void fillMissingDetails(
}
}

private DispatchQueryResponse handleDropIndexQuery(
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
FlintIndexMetadata indexMetadata =
flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails);
// if index is created without auto refresh. there is no job to cancel.
String status = JobRunState.FAILED.toString();
try {
if (indexMetadata.isAutoRefresh()) {
emrServerlessClient.cancelJobRun(
dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId());
}
} finally {
String indexName = indexQueryDetails.openSearchIndexName();
try {
AcknowledgedResponse response =
client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get();
if (!response.isAcknowledged()) {
LOG.error("failed to delete index");
}
status = JobRunState.SUCCESS.toString();
} catch (InterruptedException | ExecutionException e) {
LOG.error("failed to delete index");
}
}
return new DispatchQueryResponse(
AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()),
new DropIndexResult(status).toJobId(),
true,
dataSourceMetadata.getResultIndex(),
null);
}

private static Map<String, String> getDefaultTagsForJobSubmission(
DispatchQueryRequest dispatchQueryRequest) {
Map<String, String> tags = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public DispatchQueryResponse submit(
return new DispatchQueryResponse(
AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()),
jobId,
false,
dataSourceMetadata.getResultIndex(),
null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.function.Predicate;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.spark.dispatcher.model.JobType;
Expand Down Expand Up @@ -51,7 +52,8 @@ interface Rule<T> extends Predicate<T> {
public class ConcurrentSessionRule implements Rule<LeaseRequest> {
@Override
public String description() {
return String.format("domain concurrent active session can not exceed %d", sessionMaxLimit());
return String.format(
Locale.ROOT, "domain concurrent active session can not exceed %d", sessionMaxLimit());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,14 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.*;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.junit.Ignore;
import org.junit.Test;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.Strings;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
Expand All @@ -47,6 +35,7 @@
import org.opensearch.sql.spark.execution.session.SessionState;
import org.opensearch.sql.spark.execution.statement.StatementModel;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.leasemanager.ConcurrencyLimitExceededException;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse;
import org.opensearch.sql.spark.rest.model.LangType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_ENABLED_SETTING;
import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_LIMIT_SETTING;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME;
import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getSession;
import static org.opensearch.sql.spark.execution.statestore.StateStore.updateSessionState;
Expand All @@ -27,6 +26,7 @@
import java.util.Optional;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
Expand Down Expand Up @@ -59,12 +59,14 @@
import org.opensearch.sql.spark.execution.session.SessionState;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl;
import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.storage.DataSourceFactory;
import org.opensearch.test.OpenSearchIntegTestCase;

public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase {
public static final String DATASOURCE = "mys3";
public static final String DSOTHER = "mytest";

protected ClusterService clusterService;
protected org.opensearch.sql.common.setting.Settings pluginSettings;
Expand Down Expand Up @@ -92,9 +94,10 @@ public void setup() {
pluginSettings = new OpenSearchSettings(clusterSettings);
client = (NodeClient) cluster().client();
dataSourceService = createDataSourceService();
dataSourceService.createDataSource(
DataSourceMetadata dm =
new DataSourceMetadata(
DATASOURCE,
StringUtils.EMPTY,
DataSourceType.S3GLUE,
ImmutableList.of(),
ImmutableMap.of(
Expand All @@ -106,9 +109,28 @@ public void setup() {
"http://localhost:9200",
"glue.indexstore.opensearch.auth",
"noauth"),
null));
null);
dataSourceService.createDataSource(dm);
DataSourceMetadata otherDm =
new DataSourceMetadata(
DSOTHER,
StringUtils.EMPTY,
DataSourceType.S3GLUE,
ImmutableList.of(),
ImmutableMap.of(
"glue.auth.type",
"iam_role",
"glue.auth.role_arn",
"arn:aws:iam::924196221507:role/FlintOpensearchServiceRole",
"glue.indexstore.opensearch.uri",
"http://localhost:9200",
"glue.indexstore.opensearch.auth",
"noauth"),
null);
dataSourceService.createDataSource(otherDm);
stateStore = new StateStore(client, clusterService);
createIndexWithMappings(SPARK_RESPONSE_BUFFER_INDEX_NAME, loadResultIndexMappings());
createIndexWithMappings(dm.getResultIndex(), loadResultIndexMappings());
createIndexWithMappings(otherDm.getResultIndex(), loadResultIndexMappings());
}

@After
Expand Down Expand Up @@ -157,6 +179,7 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService(
new FlintIndexMetadataReaderImpl(client),
client,
new SessionManager(stateStore, emrServerlessClient, pluginSettings),
new DefaultLeaseManager(pluginSettings, stateStore),
stateStore);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class IndexDMLQuerySpecTest extends AsyncQueryExecutorServiceSpec {

private final FlintDatasetMock LEGACY_SKIPPING =
new FlintDatasetMock(
"DROP SKIPPING INDEX ON" + " mys3" + ".default.http_logs",
"DROP SKIPPING INDEX ON mys3.default.http_logs",
FlintIndexType.SKIPPING,
"flint_mys3_default_http_logs_skipping_index")
.isLegacy(true);
Expand All @@ -48,13 +48,13 @@ public class IndexDMLQuerySpecTest extends AsyncQueryExecutorServiceSpec {

private final FlintDatasetMock SKIPPING =
new FlintDatasetMock(
"DROP SKIPPING INDEX ON mys3" + ".default.http_logs",
"DROP SKIPPING INDEX ON mys3.default.http_logs",
FlintIndexType.SKIPPING,
"flint_mys3_default_http_logs_skipping_index")
.latestId("skippingindexid");
private final FlintDatasetMock COVERING =
new FlintDatasetMock(
"DROP INDEX covering ON mys3" + ".default.http_logs",
"DROP INDEX covering ON mys3.default.http_logs",
FlintIndexType.COVERING,
"flint_mys3_default_http_logs_covering_index")
.latestId("coveringid");
Expand Down
Loading

0 comments on commit 34a8917

Please sign in to comment.