Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Add CSV formatter in new engine (#870)
Browse files Browse the repository at this point in the history
* support csv format in new engine

* skipped some IT cases that are not applicable in new engine

* udpate

* added escape option

* added test for Format

* update

* update

* added IT for ppl

* added IT for ppl

* added IT for ppl

* added license header

* added test for sql

* added example in protocol doc

* addressed comments

* added responseParams override method to sql and ppl stats actions

* added unit test cases

* update

* addressed comment

* addressed comments
  • Loading branch information
chloe-zh authored and penghuo committed Dec 15, 2020
1 parent f89c0ac commit 6e5010c
Show file tree
Hide file tree
Showing 27 changed files with 1,004 additions and 39 deletions.
35 changes: 35 additions & 0 deletions docs/user/interfaces/protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,41 @@ Result set::
Amber,Duke,32
Dale,Adams,33
Hattie,Bond,36


The formatter sanitizes the csv result with the following rules:

1. If a header cell or data cell is starting with special character including '+', '-', '=' , '@', the sanitizer will insert a single-quote at the start of the cell.

2. If there exists one or more commas (','), the sanitizer will quote the cell with double quotes.

For example::

>> curl -H 'Content-Type: application/json' -X PUT localhost:9200/userdata/_doc/1?refresh=true -d '{
"+firstname": "-Hattie",
"=lastname": "@Bond",
"address": "671 Bristol Street, Dente, TN"
}'
>> curl -H 'Content-Type: application/json' -X POST localhost:9200/_opendistro/_sql?format=csv -d '{
"query" : "SELECT firstname, lastname, address FROM userdata"
}'

Result set::

'+firstname,'=lastname,address
'Hattie,'@Bond,"671 Bristol Street, Dente, TN"


If you prefer escaping the sanitization and keeping the original csv result, you can add a "sanitize" param and set it to false value to skip sanitizing. For example::

>> curl -H 'Content-Type: application/json' -X POST localhost:9200/_opendistro/_sql?format=csv&sanitize=false -d '{
"query" : "SELECT firstname, lastname, address FROM userdata"
}'

Result set::

+firstname,=lastname,address
Hattie,@Bond,671 Bristol Street, Dente, TN

Raw Format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.amazon.opendistroforelasticsearch.sql.legacy;

import static com.amazon.opendistroforelasticsearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
import static com.amazon.opendistroforelasticsearch.sql.legacy.TestsConstants.TEST_INDEX_BANK_CSV_SANITIZE;
import static com.amazon.opendistroforelasticsearch.sql.legacy.TestsConstants.TEST_INDEX_DOG;
import static com.amazon.opendistroforelasticsearch.sql.legacy.TestsConstants.TEST_INDEX_GAME_OF_THRONES;
import static com.amazon.opendistroforelasticsearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_TYPE;
Expand Down Expand Up @@ -64,6 +65,7 @@ protected void init() throws Exception {
loadIndex(Index.DOG);
loadIndex(Index.GAME_OF_THRONES);
loadIndex(Index.ONLINE);
loadIndex(Index.BANK_CSV_SANITIZE);
}

@Override
Expand Down Expand Up @@ -105,6 +107,7 @@ public void specificPercentilesIntAndDouble() throws IOException {
}
}

@Ignore("skip this test since array is not supported in new engine")
@Test
public void nestedObjectsAndArraysAreQuoted() throws IOException {

Expand All @@ -121,6 +124,7 @@ public void nestedObjectsAndArraysAreQuoted() throws IOException {
Assert.assertThat(result, containsString(expectedMessage));
}

@Ignore("skip this test since array is not supported in new engine")
@Test
public void arraysAreQuotedInFlatMode() throws IOException {

Expand All @@ -141,6 +145,7 @@ public void arraysAreQuotedInFlatMode() throws IOException {
setFlatOption(false);
}

@Ignore("skip this test since array is not supported in new engine")
@Test
public void doubleQuotesAreEscapedWithDoubleQuotes() throws IOException {
final String query = "SELECT * FROM " + TEST_INDEX_NESTED_WITH_QUOTES;
Expand Down Expand Up @@ -315,6 +320,7 @@ public void joinSearchResultNotNestedNotFlatNoAggs() throws Exception {
hasRow(null, null, Arrays.asList("F", "fireAndBlood", "Targaryen"), false));
}

@Ignore("skip this test because the result should be integer type without fractional part")
@Test
public void simpleNumericValueAgg() throws Exception {
String query = String.format(Locale.ROOT, "select count(*) from %s ", TEST_INDEX_DOG);
Expand Down Expand Up @@ -348,6 +354,7 @@ public void simpleNumericValueAggWithAlias() throws Exception {

}

@Ignore("skip this test because the result should be integer type without fractional part")
@Test
public void twoNumericAggWithAlias() throws Exception {
String query =
Expand All @@ -372,6 +379,7 @@ public void twoNumericAggWithAlias() throws Exception {

}

@Ignore("skip this test because the result should be integer type without fractional part")
@Test
public void aggAfterTermsGroupBy() throws Exception {
String query = String.format(Locale.ROOT, "SELECT COUNT(*) FROM %s GROUP BY gender",
Expand Down Expand Up @@ -595,6 +603,7 @@ public void twoCharsSeperator() throws Exception {

}

@Ignore("skip this test since flat, socre, type, id are not applicable in new engine")
@Test
public void includeIdAndNotTypeOrScore() throws Exception {
String query = String.format(Locale.ROOT,
Expand All @@ -609,6 +618,8 @@ public void includeIdAndNotTypeOrScore() throws Exception {
Assert.assertTrue(lines.get(0).contains(",437") || lines.get(0).contains("437,"));
}


@Ignore("skip this test since flat, socre, type, id are not applicable in new engine")
@Test
public void includeIdAndTypeButNoScore() throws Exception {
String query = String.format(Locale.ROOT,
Expand All @@ -625,6 +636,7 @@ public void includeIdAndTypeButNoScore() throws Exception {
}
//endregion Tests migrated from CSVResultsExtractorTests

@Ignore("new engine recognizes the following data as struct type")
@Test
public void sensitiveCharacterSanitizeTest() throws IOException {
String requestBody =
Expand All @@ -649,6 +661,7 @@ public void sensitiveCharacterSanitizeTest() throws IOException {
Assert.assertTrue(lines.get(0).contains("'@cmd|' /C notepad'!_xlbgnm.A1"));
}

@Ignore("new engine recognizes the following data as struct type")
@Test
public void sensitiveCharacterSanitizeAndQuotedTest() throws IOException {
String requestBody =
Expand All @@ -675,6 +688,19 @@ public void sensitiveCharacterSanitizeAndQuotedTest() throws IOException {
Assert.assertTrue(lines.get(0).contains("\",,,@cmd|' /C notepad'!_xlbgnm.A1\""));
}

@Test
public void sanitizeTest() throws IOException {
CSVResult csvResult = executeCsvRequest(
String.format(Locale.ROOT, "SELECT firstname, lastname FROM %s", TEST_INDEX_BANK_CSV_SANITIZE), false);
List<String> lines = csvResult.getLines();
assertEquals(5, lines.size());
assertEquals(lines.get(0), "'+Amber JOHnny,Duke Willmington+");
assertEquals(lines.get(1), "'-Hattie,Bond-");
assertEquals(lines.get(2), "'=Nanette,Bates=");
assertEquals(lines.get(3), "'@Dale,Adams@");
assertEquals(lines.get(4), "\",Elinor\",\"Ratliff,,,\"");
}

@Test
public void selectFunctionAsFieldTest() throws IOException {
String query = "select log(age) from " + TEST_INDEX_ACCOUNT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,10 @@ public enum Index {
"strings",
getStringIndexMapping(),
"src/test/resources/strings.json"),
BANK_CSV_SANITIZE(TestsConstants.TEST_INDEX_BANK_CSV_SANITIZE,
"account",
getBankIndexMapping(),
"src/test/resources/bank_csv_sanitize.json"),
ORDER(TestsConstants.TEST_INDEX_ORDER,
"_doc",
getOrderIndexMapping(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class TestsConstants {
public final static String TEST_INDEX_BANK_TWO = TEST_INDEX_BANK + "_two";
public final static String TEST_INDEX_BANK_WITH_NULL_VALUES =
TEST_INDEX_BANK + "_with_null_values";
public final static String TEST_INDEX_BANK_CSV_SANITIZE = TEST_INDEX_BANK + "_csv_sanitize";
public final static String TEST_INDEX_ORDER = TEST_INDEX + "_order";
public final static String TEST_INDEX_WEBLOG = TEST_INDEX + "_weblog";
public final static String TEST_INDEX_DATE = TEST_INDEX + "_date";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.ppl;

import static com.amazon.opendistroforelasticsearch.sql.legacy.TestsConstants.TEST_INDEX_BANK_CSV_SANITIZE;

import java.io.IOException;
import java.util.Locale;
import org.junit.Test;

public class CsvFormatIT extends PPLIntegTestCase {

@Override
public void init() throws IOException {
loadIndex(Index.BANK_CSV_SANITIZE);
}

@Test
public void sanitizeTest() throws IOException {
String result = executeCsvQuery(
String.format(Locale.ROOT, "source=%s | fields firstname, lastname", TEST_INDEX_BANK_CSV_SANITIZE));
assertEquals(
"firstname,lastname\n"
+ "'+Amber JOHnny,Duke Willmington+\n"
+ "'-Hattie,Bond-\n"
+ "'=Nanette,Bates=\n"
+ "'@Dale,Adams@\n"
+ "\",Elinor\",\"Ratliff,,,\"\n",
result);
}

@Test
public void escapeSanitizeTest() throws IOException {
String result = executeCsvQuery(
String.format(Locale.ROOT, "source=%s | fields firstname, lastname", TEST_INDEX_BANK_CSV_SANITIZE), false);
assertEquals(
"firstname,lastname\n"
+ "+Amber JOHnny,Duke Willmington+\n"
+ "-Hattie,Bond-\n"
+ "=Nanette,Bates=\n"
+ "@Dale,Adams@\n"
+ ",Elinor,Ratliff,,,\n",
result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ protected String explainQueryToString(String query) throws IOException {
return getResponseBody(response, true);
}

protected String executeCsvQuery(String query, boolean sanitize) throws IOException {
Request request = buildRequest(query,
QUERY_API_ENDPOINT + String.format(Locale.ROOT, "?format=csv&sanitize=%b", sanitize));
Response response = client().performRequest(request);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
return getResponseBody(response, true);
}

protected String executeCsvQuery(String query) throws IOException {
return executeCsvQuery(query, true);
}

protected Request buildRequest(String query, String endpoint) {
Request request = new Request("POST", endpoint);
request.setJsonEntity(String.format(Locale.ROOT, "{\n" + " \"query\": \"%s\"\n" + "}", query));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.sql.sql;

import static com.amazon.opendistroforelasticsearch.sql.legacy.TestsConstants.TEST_INDEX_BANK_CSV_SANITIZE;

import com.amazon.opendistroforelasticsearch.sql.legacy.SQLIntegTestCase;
import java.io.IOException;
import java.util.Locale;
import org.junit.Test;

public class CsvFormatIT extends SQLIntegTestCase {

@Override
public void init() throws IOException {
loadIndex(Index.BANK_CSV_SANITIZE);
}

@Test
public void sanitizeTest() {
String result = executeQuery(
String.format(Locale.ROOT, "SELECT firstname, lastname FROM %s", TEST_INDEX_BANK_CSV_SANITIZE), "csv");
assertEquals(
"firstname,lastname\n"
+ "'+Amber JOHnny,Duke Willmington+\n"
+ "'-Hattie,Bond-\n"
+ "'=Nanette,Bates=\n"
+ "'@Dale,Adams@\n"
+ "\",Elinor\",\"Ratliff,,,\"\n",
result);
}

@Test
public void escapeSanitizeTest() {
String result = executeQuery(
String.format(Locale.ROOT, "SELECT firstname, lastname FROM %s", TEST_INDEX_BANK_CSV_SANITIZE),
"csv&sanitize=false");
assertEquals(
"firstname,lastname\n"
+ "+Amber JOHnny,Duke Willmington+\n"
+ "-Hattie,Bond-\n"
+ "=Nanette,Bates=\n"
+ "@Dale,Adams@\n"
+ ",Elinor,Ratliff,,,\n",
result);
}
}
10 changes: 10 additions & 0 deletions integ-test/src/test/resources/bank_csv_sanitize.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"index":{"_id":"1"}}
{"account_number":1,"balance":39225,"firstname":"+Amber JOHnny","lastname":"Duke Willmington+","age":32,"gender":"M","address":"880 Holmes Lane","employer":"Pyrami","email":"amberduke@pyrami.com","city":"Brogan","state":"IL", "male" : true, "birthdate" : "2017-10-23"}
{"index":{"_id":"6"}}
{"account_number":6,"balance":5686,"firstname":"-Hattie","lastname":"Bond-","age":36,"gender":"M","address":"671 Bristol Street","employer":"Netagy","email":"hattiebond@netagy.com","city":"Dante","state":"TN", "male" : true, "birthdate" : "2017-11-20"}
{"index":{"_id":"13"}}
{"account_number":13,"balance":32838,"firstname":"=Nanette","lastname":"Bates=","age":28,"gender":"F","address":"789 Madison Street","employer":"Quility","email":"nanettebates@quility.com","city":"Nogal","state":"VA", "male" : false, "birthdate" : "2018-06-23"}
{"index":{"_id":"18"}}
{"account_number":18,"balance":4180,"firstname":"@Dale","lastname":"Adams@","age":33,"gender":"M","address":"467 Hutchinson Court","employer":"Boink","email":"daleadams@boink.com","city":"Orick","state":"MD","male" : true, "birthdate" : 1542152000000}
{"index":{"_id":"20"}}
{"account_number":20,"balance":16418,"firstname":",Elinor","lastname":"Ratliff,,,","age":36,"gender":"M","address":"282 Kings Place","employer":"Scentric","email":"elinorratliff@scentric.com","city":"Ribera","state":"WA", "male" : true, "birthdate" : "2018-06-27"}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan;
import com.amazon.opendistroforelasticsearch.sql.protocol.response.QueryResult;
import com.amazon.opendistroforelasticsearch.sql.protocol.response.format.JdbcResponseFormatter;
import com.amazon.opendistroforelasticsearch.sql.protocol.response.format.CsvResponseFormatter;
import com.amazon.opendistroforelasticsearch.sql.protocol.response.format.Format;
import com.amazon.opendistroforelasticsearch.sql.protocol.response.format.JsonResponseFormatter;
import com.amazon.opendistroforelasticsearch.sql.protocol.response.format.ResponseFormatter;
import com.amazon.opendistroforelasticsearch.sql.sql.SQLService;
import com.amazon.opendistroforelasticsearch.sql.sql.config.SQLServiceConfig;
import com.amazon.opendistroforelasticsearch.sql.sql.domain.SQLQueryRequest;
Expand Down Expand Up @@ -65,6 +68,9 @@ public class RestSQLQueryAction extends BaseRestHandler {
*/
private final Settings pluginSettings;

/**
* Constructor of RestSQLQueryAction.
*/
public RestSQLQueryAction(ClusterService clusterService, Settings pluginSettings) {
super();
this.clusterService = clusterService;
Expand Down Expand Up @@ -112,7 +118,7 @@ public RestChannelConsumer prepareRequest(SQLQueryRequest request, NodeClient no
if (request.isExplainRequest()) {
return channel -> sqlService.explain(plan, createExplainResponseListener(channel));
}
return channel -> sqlService.execute(plan, createQueryResponseListener(channel));
return channel -> sqlService.execute(plan, createQueryResponseListener(channel, request));
}

private SQLService createSQLService(NodeClient client) {
Expand Down Expand Up @@ -149,8 +155,14 @@ public void onFailure(Exception e) {
};
}

private ResponseListener<QueryResponse> createQueryResponseListener(RestChannel channel) {
JdbcResponseFormatter formatter = new JdbcResponseFormatter(PRETTY);
private ResponseListener<QueryResponse> createQueryResponseListener(RestChannel channel, SQLQueryRequest request) {
Format format = request.format();
ResponseFormatter<QueryResult> formatter;
if (format.equals(Format.CSV)) {
formatter = new CsvResponseFormatter(request.sanitize());
} else {
formatter = new JdbcResponseFormatter(PRETTY);
}
return new ResponseListener<QueryResponse>() {
@Override
public void onResponse(QueryResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
if (isNewEngineEnabled() && isCursorDisabled()) {
// Route request to new query engine if it's supported already
SQLQueryRequest newSqlRequest = new SQLQueryRequest(sqlRequest.getJsonContent(),
sqlRequest.getSql(),
request.path(),
format.getFormatName());
sqlRequest.getSql(), request.path(), request.params());
RestChannelConsumer result = newSqlQueryHandler.prepareRequest(newSqlRequest, client);
if (result != RestSQLQueryAction.NOT_SUPPORTED_YET) {
LOG.info("[{}] Request {} is handled by new SQL query engine",
Expand All @@ -175,7 +173,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
@Override
protected Set<String> responseParams() {
Set<String> responseParams = new HashSet<>(super.responseParams());
responseParams.addAll(Arrays.asList("sql", "flat", "separator", "_score", "_type", "_id", "newLine", "format"));
responseParams.addAll(Arrays.asList("sql", "flat", "separator", "_score", "_type", "_id", "newLine", "format", "sanitize"));
return responseParams;
}

Expand Down
Loading

0 comments on commit 6e5010c

Please sign in to comment.