Skip to content

Commit

Permalink
ES|QL CCS uses skip_unavailable setting for handling disconnected rem…
Browse files Browse the repository at this point in the history
…ote clusters (elastic#115266)

As part of ES|QL planning of a cross-cluster search, a field-caps call is done to each cluster and,
if an ENRICH command is present, the enrich policy-resolve API is called on each remote. If a
remote cluster cannot be connected to in these calls, the outcome depends on the
skip_unavailable setting.

For skip_unavailable=false clusters, the error is fatal and the error will immediately be propagated
back to the client with a top level error message with a 500 HTTP status response code.

For skip_unavailable=true clusters, the error is not fatal. The error will be trapped, recorded in the
EsqlExecutionInfo object for the query, marking the cluster as SKIPPED. If the user requested
CCS metadata to be included, the cluster status and connection failure will be present in the
_clusters/details section of the response.

If no clusters can be contacted, if they are all marked as skip_unavailable=true, no error will be
returned. Instead a 200 HTTP status will be returned with no column and no values. If the
include_ccs_metadata: true setting was included on the query, the errors will listed in the
_clusters metadata section. (Note: this is also how the _search endpoint works for CCS.)

Partially addresses elastic#114531
  • Loading branch information
quux00 authored Oct 29, 2024
1 parent e5d5c17 commit b6d2d4b
Show file tree
Hide file tree
Showing 17 changed files with 1,819 additions and 144 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/115266.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 115266
summary: ES|QL CCS uses `skip_unavailable` setting for handling disconnected remote
clusters
area: ES|QL
type: enhancement
issues: [ 114531 ]
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INDEX_REQUEST_REMOVE_METERING = def(8_780_00_0);
public static final TransportVersion CPU_STAT_STRING_PARSING = def(8_781_00_0);
public static final TransportVersion QUERY_RULES_RETRIEVER = def(8_782_00_0);
public static final TransportVersion ESQL_CCS_EXEC_INFO_WITH_FAILURES = def(8_783_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.XContentTestUtils;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
Expand Down Expand Up @@ -246,7 +247,8 @@ public void testSearchesWhereMissingIndicesAreSpecified() {

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("no_such_index"));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
// TODO: a follow on PR will change this to throw an Exception when the local cluster requests a concrete index that is missing
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
assertThat(localCluster.getTotalShards(), equalTo(0));
Expand Down Expand Up @@ -499,7 +501,7 @@ public void testCCSExecutionOnSearchesWithLimit0() {

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("nomatch*"));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
assertThat(remoteCluster.getTotalShards(), equalTo(0));
Expand Down Expand Up @@ -803,6 +805,14 @@ Map<String, Object> setupTwoClusters() {
clusterInfo.put("local.index", localIndex);
clusterInfo.put("remote.num_shards", numShardsRemote);
clusterInfo.put("remote.index", remoteIndex);

String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER);
Setting<?> skipUnavailableSetting = cluster(REMOTE_CLUSTER).clusterService().getClusterSettings().get(skipUnavailableKey);
boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService()
.getClusterSettings()
.get(skipUnavailableSetting);
clusterInfo.put("remote.skip_unavailable", skipUnavailable);

return clusterInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -281,6 +282,7 @@ public static class Cluster implements ToXContentFragment, Writeable {
private final Integer successfulShards;
private final Integer skippedShards;
private final Integer failedShards;
private final List<ShardSearchFailure> failures;
private final TimeValue took; // search latency for this cluster sub-search

/**
Expand All @@ -300,7 +302,7 @@ public String toString() {
}

public Cluster(String clusterAlias, String indexExpression) {
this(clusterAlias, indexExpression, true, Cluster.Status.RUNNING, null, null, null, null, null);
this(clusterAlias, indexExpression, true, Cluster.Status.RUNNING, null, null, null, null, null, null);
}

/**
Expand All @@ -312,7 +314,7 @@ public Cluster(String clusterAlias, String indexExpression) {
* @param skipUnavailable whether this Cluster is marked as skip_unavailable in remote cluster settings
*/
public Cluster(String clusterAlias, String indexExpression, boolean skipUnavailable) {
this(clusterAlias, indexExpression, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null);
this(clusterAlias, indexExpression, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null, null);
}

/**
Expand All @@ -324,7 +326,7 @@ public Cluster(String clusterAlias, String indexExpression, boolean skipUnavaila
* @param status current status of the search on this Cluster
*/
public Cluster(String clusterAlias, String indexExpression, boolean skipUnavailable, Cluster.Status status) {
this(clusterAlias, indexExpression, skipUnavailable, status, null, null, null, null, null);
this(clusterAlias, indexExpression, skipUnavailable, status, null, null, null, null, null, null);
}

public Cluster(
Expand All @@ -336,6 +338,7 @@ public Cluster(
Integer successfulShards,
Integer skippedShards,
Integer failedShards,
List<ShardSearchFailure> failures,
TimeValue took
) {
assert clusterAlias != null : "clusterAlias cannot be null";
Expand All @@ -349,6 +352,11 @@ public Cluster(
this.successfulShards = successfulShards;
this.skippedShards = skippedShards;
this.failedShards = failedShards;
if (failures == null) {
this.failures = List.of();
} else {
this.failures = failures;
}
this.took = took;
}

Expand All @@ -362,6 +370,11 @@ public Cluster(StreamInput in) throws IOException {
this.failedShards = in.readOptionalInt();
this.took = in.readOptionalTimeValue();
this.skipUnavailable = in.readBoolean();
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) {
this.failures = Collections.unmodifiableList(in.readCollectionAsList(ShardSearchFailure::readShardSearchFailure));
} else {
this.failures = List.of();
}
}

@Override
Expand All @@ -375,6 +388,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalInt(failedShards);
out.writeOptionalTimeValue(took);
out.writeBoolean(skipUnavailable);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) {
out.writeCollection(failures);
}
}

/**
Expand All @@ -387,12 +403,12 @@ public void writeTo(StreamOutput out) throws IOException {
* All other fields can be set and override the value in the "copyFrom" Cluster.
*/
public static class Builder {
private String indexExpression;
private Cluster.Status status;
private Integer totalShards;
private Integer successfulShards;
private Integer skippedShards;
private Integer failedShards;
private List<ShardSearchFailure> failures;
private TimeValue took;
private final Cluster original;

Expand All @@ -408,22 +424,18 @@ public Builder(Cluster copyFrom) {
public Cluster build() {
return new Cluster(
original.getClusterAlias(),
indexExpression == null ? original.getIndexExpression() : indexExpression,
original.getIndexExpression(),
original.isSkipUnavailable(),
status != null ? status : original.getStatus(),
totalShards != null ? totalShards : original.getTotalShards(),
successfulShards != null ? successfulShards : original.getSuccessfulShards(),
skippedShards != null ? skippedShards : original.getSkippedShards(),
failedShards != null ? failedShards : original.getFailedShards(),
failures != null ? failures : original.getFailures(),
took != null ? took : original.getTook()
);
}

public Cluster.Builder setIndexExpression(String indexExpression) {
this.indexExpression = indexExpression;
return this;
}

public Cluster.Builder setStatus(Cluster.Status status) {
this.status = status;
return this;
Expand All @@ -449,6 +461,11 @@ public Cluster.Builder setFailedShards(int failedShards) {
return this;
}

public Cluster.Builder setFailures(List<ShardSearchFailure> failures) {
this.failures = failures;
return this;
}

public Cluster.Builder setTook(TimeValue took) {
this.took = took;
return this;
Expand All @@ -466,7 +483,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(STATUS_FIELD.getPreferredName(), getStatus().toString());
builder.field(INDICES_FIELD.getPreferredName(), indexExpression);
if (took != null) {
// TODO: change this to took_nanos and call took.nanos?
builder.field(TOOK.getPreferredName(), took.millis());
}
if (totalShards != null) {
Expand All @@ -483,6 +499,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject();
}
if (failures != null && failures.size() > 0) {
builder.startArray(RestActions.FAILURES_FIELD.getPreferredName());
for (ShardSearchFailure failure : failures) {
failure.toXContent(builder, params);
}
builder.endArray();
}
}
builder.endObject();
return builder;
Expand Down Expand Up @@ -529,6 +552,10 @@ public Integer getFailedShards() {
return failedShards;
}

public List<ShardSearchFailure> getFailures() {
return failures;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public final class EnrichResolution {

private final Map<Key, ResolvedEnrichPolicy> resolvedPolicies = ConcurrentCollections.newConcurrentMap();
private final Map<Key, String> errors = ConcurrentCollections.newConcurrentMap();
private final Map<String, Exception> unavailableClusters = ConcurrentCollections.newConcurrentMap();

public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mode) {
return resolvedPolicies.get(new Key(policyName, mode));
Expand Down Expand Up @@ -51,6 +52,14 @@ public void addError(String policyName, Enrich.Mode mode, String reason) {
errors.putIfAbsent(new Key(policyName, mode), reason);
}

public void addUnavailableCluster(String clusterAlias, Exception e) {
unavailableClusters.put(clusterAlias, e);
}

public Map<String, Exception> getUnavailableClusters() {
return unavailableClusters;
}

private record Key(String policyName, Enrich.Mode mode) {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.enrich;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -50,6 +51,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -113,12 +115,27 @@ public void resolvePolicies(
final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
final EnrichResolution enrichResolution = new EnrichResolution();

Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>();

for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
String clusterAlias = entry.getKey();
if (entry.getValue().connectionError != null) {
enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError);
// remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy
remoteClusters.remove(clusterAlias);
} else {
lookupResponsesToProcess.put(clusterAlias, entry.getValue());
}
}

for (UnresolvedPolicy unresolved : unresolvedPolicies) {
Tuple<ResolvedEnrichPolicy, String> resolved = mergeLookupResults(
unresolved,
calculateTargetClusters(unresolved.mode, includeLocal, remoteClusters),
lookupResponses
lookupResponsesToProcess
);

if (resolved.v1() != null) {
enrichResolution.addResolvedPolicy(unresolved.name, unresolved.mode, resolved.v1());
} else {
Expand Down Expand Up @@ -149,13 +166,16 @@ private Tuple<ResolvedEnrichPolicy, String> mergeLookupResults(
Collection<String> targetClusters,
Map<String, LookupResponse> lookupResults
) {
assert targetClusters.isEmpty() == false;
String policyName = unresolved.name;
if (targetClusters.isEmpty()) {
return Tuple.tuple(null, "enrich policy [" + policyName + "] cannot be resolved since remote clusters are unavailable");
}
final Map<String, ResolvedEnrichPolicy> policies = new HashMap<>();
final List<String> failures = new ArrayList<>();
for (String cluster : targetClusters) {
LookupResponse lookupResult = lookupResults.get(cluster);
if (lookupResult != null) {
assert lookupResult.connectionError == null : "Should never have a non-null connectionError here";
ResolvedEnrichPolicy policy = lookupResult.policies.get(policyName);
if (policy != null) {
policies.put(cluster, policy);
Expand Down Expand Up @@ -261,22 +281,34 @@ private void lookupPolicies(
if (remotePolicies.isEmpty() == false) {
for (String cluster : remoteClusters) {
ActionListener<LookupResponse> lookupListener = refs.acquire(resp -> lookupResponses.put(cluster, resp));
getRemoteConnection(
cluster,
lookupListener.delegateFailureAndWrap(
(delegate, connection) -> transportService.sendRequest(
getRemoteConnection(cluster, new ActionListener<Transport.Connection>() {
@Override
public void onResponse(Transport.Connection connection) {
transportService.sendRequest(
connection,
RESOLVE_ACTION_NAME,
new LookupRequest(cluster, remotePolicies),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
delegate,
LookupResponse::new,
threadPool.executor(ThreadPool.Names.SEARCH)
)
)
)
);
new ActionListenerResponseHandler<>(lookupListener.delegateResponse((l, e) -> {
if (ExceptionsHelper.isRemoteUnavailableException(e)
&& remoteClusterService.isSkipUnavailable(cluster)) {
l.onResponse(new LookupResponse(e));
} else {
l.onFailure(e);
}
}), LookupResponse::new, threadPool.executor(ThreadPool.Names.SEARCH))
);
}

@Override
public void onFailure(Exception e) {
if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) {
lookupListener.onResponse(new LookupResponse(e));
} else {
lookupListener.onFailure(e);
}
}
});
}
}
// local cluster
Expand Down Expand Up @@ -323,16 +355,30 @@ public void writeTo(StreamOutput out) throws IOException {
private static class LookupResponse extends TransportResponse {
final Map<String, ResolvedEnrichPolicy> policies;
final Map<String, String> failures;
// does not need to be Writable since this indicates a failure to contact a remote cluster, so only set on querying cluster
final transient Exception connectionError;

LookupResponse(Map<String, ResolvedEnrichPolicy> policies, Map<String, String> failures) {
this.policies = policies;
this.failures = failures;
this.connectionError = null;
}

/**
* Use this constructor when the remote cluster is unavailable to indicate inability to do the enrich policy lookup
* @param connectionError Exception received when trying to connect to a remote cluster
*/
LookupResponse(Exception connectionError) {
this.policies = Collections.emptyMap();
this.failures = Collections.emptyMap();
this.connectionError = connectionError;
}

LookupResponse(StreamInput in) throws IOException {
PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null);
this.policies = planIn.readMap(StreamInput::readString, ResolvedEnrichPolicy::new);
this.failures = planIn.readMap(StreamInput::readString, StreamInput::readString);
this.connectionError = null;
}

@Override
Expand Down
Loading

0 comments on commit b6d2d4b

Please sign in to comment.