Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Add PPL stats endpoint #706

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.sql.ppl;

import static com.amazon.opendistroforelasticsearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
import static org.hamcrest.Matchers.equalTo;

import com.amazon.opendistroforelasticsearch.sql.legacy.metrics.MetricName;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;

public class MetricsIT extends PPLIntegTestCase {

@Override
protected void init() throws Exception {
loadIndex(Index.BANK);
}

@Test
public void requestCount() throws IOException, InterruptedException {
int beforeQueries = pplRequestTotal();
multiQueries(3);
TimeUnit.SECONDS.sleep(2L);

assertThat(pplRequestTotal(), equalTo(beforeQueries + 3));
}

private void multiQueries(int n) throws IOException {
for (int i = 0; i < n; ++i) {
executeQuery(String.format("source=%s | where age = 31 + 1 | fields age", TEST_INDEX_BANK));
}
}

private Request makeStatRequest() {
return new Request(
"GET", "/_opendistro/_ppl/stats"
);
}

private int pplRequestTotal() throws IOException {
JSONObject jsonObject = new JSONObject(executeStatRequest(makeStatRequest()));
return jsonObject.getInt(MetricName.PPL_REQ_TOTAL.getName());
}

private String executeStatRequest(final Request request) throws IOException {
Response sqlResponse = client().performRequest(request);

Assert.assertTrue(sqlResponse.getStatusLine().getStatusCode() == 200);

InputStream is = sqlResponse.getEntity().getContent();
StringBuilder sb = new StringBuilder();
try (BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
String line = null;
while ((line = br.readLine()) != null) {
sb.append(line);
}
}

return sb.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public static Metric createMetric(MetricName name) {
case REQ_TOTAL:
case DEFAULT_CURSOR_REQUEST_TOTAL:
case DEFAULT:
case PPL_REQ_TOTAL:
return new NumericMetric<>(name.getName(), new BasicCounter());
case CIRCUIT_BREAKER:
return new GaugeMetric<>(name.getName(), BackOffRetryStrategy.GET_CB_STATE);
Expand All @@ -33,6 +34,9 @@ public static Metric createMetric(MetricName name) {
case FAILED_REQ_COUNT_CUS:
case FAILED_REQ_COUNT_SYS:
case FAILED_REQ_COUNT_CB:
case PPL_REQ_COUNT_TOTAL:
case PPL_FAILED_REQ_COUNT_CUS:
case PPL_FAILED_REQ_COUNT_SYS:
return new NumericMetric<>(name.getName(), new RollingCounter());
default:
return new NumericMetric<>(name.getName(), new BasicCounter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

package com.amazon.opendistroforelasticsearch.sql.legacy.metrics;

import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public enum MetricName {
Expand All @@ -29,7 +31,12 @@ public enum MetricName {
DEFAULT_CURSOR_REQUEST_TOTAL("default_cursor_request_total"),
DEFAULT_CURSOR_REQUEST_COUNT_TOTAL("default_cursor_request_count"),
CIRCUIT_BREAKER("circuit_breaker"),
DEFAULT("default");
DEFAULT("default"),

PPL_REQ_TOTAL("ppl_request_total"),
PPL_REQ_COUNT_TOTAL("ppl_request_count"),
PPL_FAILED_REQ_COUNT_SYS("ppl_failed_request_count_syserr"),
PPL_FAILED_REQ_COUNT_CUS("ppl_failed_request_count_cuserr");

private String name;

Expand All @@ -45,10 +52,18 @@ public static List<String> getNames() {
return Arrays.stream(MetricName.values()).map(v -> v.name).collect(Collectors.toList());
}


private static Set<MetricName> NUMERICAL_METRIC = new ImmutableSet.Builder<MetricName>()
.add(PPL_REQ_TOTAL)
.add(PPL_REQ_COUNT_TOTAL)
.add(PPL_FAILED_REQ_COUNT_SYS)
.add(PPL_FAILED_REQ_COUNT_CUS)
.build();

public boolean isNumerical() {
return this == REQ_TOTAL || this == REQ_COUNT_TOTAL || this == FAILED_REQ_COUNT_SYS
|| this == FAILED_REQ_COUNT_CUS || this == FAILED_REQ_COUNT_CB || this == DEFAULT
|| this == DEFAULT_CURSOR_REQUEST_TOTAL || this == DEFAULT_CURSOR_REQUEST_COUNT_TOTAL;
|| this == FAILED_REQ_COUNT_CUS || this == FAILED_REQ_COUNT_CB || this == DEFAULT
|| this == DEFAULT_CURSOR_REQUEST_TOTAL || this == DEFAULT_CURSOR_REQUEST_COUNT_TOTAL
|| NUMERICAL_METRIC.contains(this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.amazon.opendistroforelasticsearch.sql.legacy.plugin.RestSqlStatsAction;
import com.amazon.opendistroforelasticsearch.sql.legacy.plugin.SqlSettings;
import com.amazon.opendistroforelasticsearch.sql.plugin.rest.RestPPLQueryAction;
import com.amazon.opendistroforelasticsearch.sql.plugin.rest.RestPPLStatsAction;
import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -100,7 +101,8 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new RestPPLQueryAction(restController, clusterService, pluginSettings, settings),
new RestSqlAction(settings, clusterService, pluginSettings),
new RestSqlStatsAction(settings, restController),
new RestSqlSettingsAction(settings, restController)
new RestSqlSettingsAction(settings, restController),
new RestPPLStatsAction(settings, restController)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import com.amazon.opendistroforelasticsearch.sql.exception.QueryEngineException;
import com.amazon.opendistroforelasticsearch.sql.exception.SemanticCheckException;
import com.amazon.opendistroforelasticsearch.sql.executor.ExecutionEngine.QueryResponse;
import com.amazon.opendistroforelasticsearch.sql.legacy.metrics.MetricName;
import com.amazon.opendistroforelasticsearch.sql.legacy.metrics.Metrics;
import com.amazon.opendistroforelasticsearch.sql.legacy.utils.LogUtils;
import com.amazon.opendistroforelasticsearch.sql.plugin.request.PPLQueryRequestFactory;
import com.amazon.opendistroforelasticsearch.sql.ppl.PPLService;
import com.amazon.opendistroforelasticsearch.sql.ppl.config.PPLServiceConfig;
Expand Down Expand Up @@ -97,6 +100,11 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nodeClient) {
Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_TOTAL).increment();
Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_COUNT_TOTAL).increment();

LogUtils.addRequestId();

if (!pplEnabled.get()) {
return channel -> reportError(channel, new IllegalAccessException(
"Either opendistro.ppl.enabled or rest.action.multi.allow_explicit_index setting is false"
Expand Down Expand Up @@ -145,7 +153,13 @@ public void onResponse(QueryResponse response) {
@Override
public void onFailure(Exception e) {
LOG.error("Error happened during query handling", e);
reportError(channel, e, isClientError(e) ? BAD_REQUEST : SERVICE_UNAVAILABLE);
if (isClientError(e)) {
Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_CUS).increment();
reportError(channel, e, BAD_REQUEST);
} else {
Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_SYS).increment();
reportError(channel, e, SERVICE_UNAVAILABLE);
}
}

private void sendResponse(RestStatus status, String content) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.sql.plugin.rest;

import static org.elasticsearch.rest.RestStatus.SERVICE_UNAVAILABLE;

import com.amazon.opendistroforelasticsearch.sql.legacy.executor.format.ErrorMessageFactory;
import com.amazon.opendistroforelasticsearch.sql.legacy.metrics.Metrics;
import com.amazon.opendistroforelasticsearch.sql.legacy.utils.LogUtils;
import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;

/**
* PPL Node level status.
*/
public class RestPPLStatsAction extends BaseRestHandler {

private static final Logger LOG = LogManager.getLogger(RestPPLStatsAction.class);

/**
* API endpoint path.
*/
public static final String PPL_STATS_API_ENDPOINT = "/_opendistro/_ppl/stats";

public RestPPLStatsAction(Settings settings, RestController restController) {
super();
}

@Override
public String getName() {
return "ppl_stats_action";
}

@Override
public List<Route> routes() {
return ImmutableList.of(
new Route(RestRequest.Method.POST, PPL_STATS_API_ENDPOINT),
new Route(RestRequest.Method.GET, PPL_STATS_API_ENDPOINT)
);
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {

LogUtils.addRequestId();

try {
return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.OK,
Metrics.getInstance().collectToJSON()));
} catch (Exception e) {
LOG.error("Failed during Query PPL STATS Action.", e);

return channel -> channel.sendResponse(new BytesRestResponse(SERVICE_UNAVAILABLE,
ErrorMessageFactory.createErrorMessage(e, SERVICE_UNAVAILABLE.getStatus()).toString()));
}
}
}