Skip to content

Commit

Permalink
[8.x] Add CCS tests for index filtering (elastic#119619) (elastic#119716
Browse files Browse the repository at this point in the history
)

* Add CCS tests for index filtering (elastic#119619)

* Add CCS tests for index filtering

See also: elastic#116755

* Don't run the test on pre-8.18
  • Loading branch information
smalyshev authored Jan 8, 2025
1 parent d521232 commit 9d8504f
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.ccq;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.http.HttpHost;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.test.TestClustersThreadFilter;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.xpack.esql.qa.rest.RequestIndexFilteringTestCase;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

import java.io.IOException;

@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class RequestIndexFilteringIT extends RequestIndexFilteringTestCase {

static ElasticsearchCluster remoteCluster = Clusters.remoteCluster();
static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster);

@ClassRule
public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);
private static RestClient remoteClient;

@Override
protected String getTestRestCluster() {
return localCluster.getHttpAddresses();
}

@Before
public void setRemoteClient() throws IOException {
if (remoteClient == null) {
var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses());
remoteClient = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
}
}

@BeforeClass
public static void checkVersion() {
assumeTrue("skip if version before 8.18", Clusters.localClusterVersion().onOrAfter(Version.V_8_18_0));
}

@AfterClass
public static void closeRemoteClients() throws IOException {
try {
IOUtils.close(remoteClient);
} finally {
remoteClient = null;
}
}

@Override
protected void indexTimestampData(int docs, String indexName, String date, String differentiatorFieldName) throws IOException {
indexTimestampDataForClient(client(), docs, indexName, date, differentiatorFieldName);
indexTimestampDataForClient(remoteClient, docs, indexName, date, differentiatorFieldName);
}

@Override
protected String from(String... indexName) {
if (randomBoolean()) {
return "FROM *:" + String.join(",*:", indexName);
} else {
return "FROM " + String.join(",", indexName);
}
}

@After
public void wipeRemoteTestData() throws IOException {
try {
var response = remoteClient.performRequest(new Request("DELETE", "/test*"));
assertEquals(200, response.getStatusLine().getStatusCode());
} catch (ResponseException re) {
assertEquals(404, re.getResponse().getStatusLine().getStatusCode());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.esql.AssertWarnings;
Expand All @@ -30,12 +31,13 @@
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.entityToMap;
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.requestObjectBuilder;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.oneOf;

public abstract class RequestIndexFilteringTestCase extends ESRestTestCase {

Expand All @@ -49,14 +51,18 @@ public void wipeTestData() throws IOException {
}
}

protected String from(String... indexName) {
return "FROM " + String.join(",", indexName);
}

public void testTimestampFilterFromQuery() throws IOException {
int docsTest1 = 50;
int docsTest2 = 30;
indexTimestampData(docsTest1, "test1", "2024-11-26", "id1");
indexTimestampData(docsTest2, "test2", "2023-11-26", "id2");

// filter includes both indices in the result (all columns, all rows)
RestEsqlTestCase.RequestObjectBuilder builder = timestampFilter("gte", "2023-01-01").query("FROM test*");
RestEsqlTestCase.RequestObjectBuilder builder = timestampFilter("gte", "2023-01-01").query(from("test*"));
Map<String, Object> result = runEsql(builder);
assertMap(
result,
Expand All @@ -70,7 +76,7 @@ public void testTimestampFilterFromQuery() throws IOException {
);

// filter includes only test1. Columns from test2 are filtered out, as well (not only rows)!
builder = timestampFilter("gte", "2024-01-01").query("FROM test*");
builder = timestampFilter("gte", "2024-01-01").query(from("test*"));
assertMap(
runEsql(builder),
matchesMap().entry(
Expand All @@ -83,7 +89,7 @@ public void testTimestampFilterFromQuery() throws IOException {

// filter excludes both indices (no rows); the first analysis step fails because there are no columns, a second attempt succeeds
// after eliminating the index filter. All columns are returned.
builder = timestampFilter("gte", "2025-01-01").query("FROM test*");
builder = timestampFilter("gte", "2025-01-01").query(from("test*"));
assertMap(
runEsql(builder),
matchesMap().entry(
Expand All @@ -103,7 +109,7 @@ public void testFieldExistsFilter_KeepWildcard() throws IOException {
indexTimestampData(docsTest2, "test2", "2023-11-26", "id2");

// filter includes only test1. Columns and rows of test2 are filtered out
RestEsqlTestCase.RequestObjectBuilder builder = existsFilter("id1").query("FROM test*");
RestEsqlTestCase.RequestObjectBuilder builder = existsFilter("id1").query(from("test*"));
Map<String, Object> result = runEsql(builder);
assertMap(
result,
Expand All @@ -116,7 +122,7 @@ public void testFieldExistsFilter_KeepWildcard() throws IOException {
);

// filter includes only test1. Columns from test2 are filtered out, as well (not only rows)!
builder = existsFilter("id1").query("FROM test* METADATA _index | KEEP _index, id*");
builder = existsFilter("id1").query(from("test*") + " METADATA _index | KEEP _index, id*");
result = runEsql(builder);
assertMap(
result,
Expand All @@ -129,7 +135,7 @@ public void testFieldExistsFilter_KeepWildcard() throws IOException {
@SuppressWarnings("unchecked")
var values = (List<List<Object>>) result.get("values");
for (List<Object> row : values) {
assertThat(row.get(0), equalTo("test1"));
assertThat(row.get(0), oneOf("test1", "remote_cluster:test1"));
assertThat(row.get(1), instanceOf(Integer.class));
}
}
Expand All @@ -142,7 +148,7 @@ public void testFieldExistsFilter_With_ExplicitUseOfDiscardedIndexFields() throw

// test2 is explicitly used in a query with "SORT id2" even if the index filter should discard test2
RestEsqlTestCase.RequestObjectBuilder builder = existsFilter("id1").query(
"FROM test* METADATA _index | SORT id2 | KEEP _index, id*"
from("test*") + " METADATA _index | SORT id2 | KEEP _index, id*"
);
Map<String, Object> result = runEsql(builder);
assertMap(
Expand All @@ -157,7 +163,7 @@ public void testFieldExistsFilter_With_ExplicitUseOfDiscardedIndexFields() throw
@SuppressWarnings("unchecked")
var values = (List<List<Object>>) result.get("values");
for (List<Object> row : values) {
assertThat(row.get(0), equalTo("test1"));
assertThat(row.get(0), oneOf("test1", "remote_cluster:test1"));
assertThat(row.get(1), instanceOf(Integer.class));
assertThat(row.get(2), nullValue());
}
Expand All @@ -172,59 +178,59 @@ public void testFieldNameTypo() throws IOException {
// idx field name is explicitly used, though it doesn't exist in any of the indices. First test - without filter
ResponseException e = expectThrows(
ResponseException.class,
() -> runEsql(requestObjectBuilder().query("FROM test* | WHERE idx == 123"))
() -> runEsql(requestObjectBuilder().query(from("test*") + " | WHERE idx == 123"))
);
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getMessage(), containsString("verification_exception"));
assertThat(e.getMessage(), containsString("Found 1 problem"));
assertThat(e.getMessage(), containsString("line 1:20: Unknown column [idx]"));
assertThat(e.getMessage(), containsString("Unknown column [idx]"));

e = expectThrows(ResponseException.class, () -> runEsql(requestObjectBuilder().query("FROM test1 | WHERE idx == 123")));
e = expectThrows(ResponseException.class, () -> runEsql(requestObjectBuilder().query(from("test1") + " | WHERE idx == 123")));
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getMessage(), containsString("verification_exception"));
assertThat(e.getMessage(), containsString("Found 1 problem"));
assertThat(e.getMessage(), containsString("line 1:20: Unknown column [idx]"));
assertThat(e.getMessage(), containsString("Unknown column [idx]"));

e = expectThrows(
ResponseException.class,
() -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM test* | WHERE idx == 123"))
() -> runEsql(timestampFilter("gte", "2020-01-01").query(from("test*") + " | WHERE idx == 123"))
);
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getMessage(), containsString("Found 1 problem"));
assertThat(e.getMessage(), containsString("line 1:20: Unknown column [idx]"));
assertThat(e.getMessage(), containsString("Unknown column [idx]"));

e = expectThrows(
ResponseException.class,
() -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM test2 | WHERE idx == 123"))
() -> runEsql(timestampFilter("gte", "2020-01-01").query(from("test2") + " | WHERE idx == 123"))
);
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getMessage(), containsString("Found 1 problem"));
assertThat(e.getMessage(), containsString("line 1:20: Unknown column [idx]"));
assertThat(e.getMessage(), containsString("Unknown column [idx]"));
}

public void testIndicesDontExist() throws IOException {
int docsTest1 = 0; // we are interested only in the created index, not necessarily that it has data
indexTimestampData(docsTest1, "test1", "2024-11-26", "id1");

ResponseException e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM foo")));
ResponseException e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query(from("foo"))));
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getMessage(), containsString("verification_exception"));
assertThat(e.getMessage(), containsString("Unknown index [foo]"));
assertThat(e.getMessage(), anyOf(containsString("Unknown index [foo]"), containsString("Unknown index [remote_cluster:foo]")));

e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM foo*")));
e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query(from("foo*"))));
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getMessage(), containsString("verification_exception"));
assertThat(e.getMessage(), containsString("Unknown index [foo*]"));
assertThat(e.getMessage(), anyOf(containsString("Unknown index [foo*]"), containsString("Unknown index [remote_cluster:foo*]")));

e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM foo,test1")));
e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query(from("foo", "test1"))));
assertEquals(404, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getMessage(), containsString("index_not_found_exception"));
assertThat(e.getMessage(), containsString("no such index [foo]"));
assertThat(e.getMessage(), anyOf(containsString("no such index [foo]"), containsString("no such index [remote_cluster:foo]")));

if (EsqlCapabilities.Cap.JOIN_LOOKUP_V10.isEnabled()) {
e = expectThrows(
ResponseException.class,
() -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM test1 | LOOKUP JOIN foo ON id1"))
() -> runEsql(timestampFilter("gte", "2020-01-01").query(from("test1") + " | LOOKUP JOIN foo ON id1"))
);
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getMessage(), containsString("verification_exception"));
Expand All @@ -251,6 +257,11 @@ public Map<String, Object> runEsql(RestEsqlTestCase.RequestObjectBuilder request
}

protected void indexTimestampData(int docs, String indexName, String date, String differentiatorFieldName) throws IOException {
indexTimestampDataForClient(client(), docs, indexName, date, differentiatorFieldName);
}

protected void indexTimestampDataForClient(RestClient client, int docs, String indexName, String date, String differentiatorFieldName)
throws IOException {
Request createIndex = new Request("PUT", indexName);
createIndex.setJsonEntity("""
{
Expand All @@ -273,7 +284,7 @@ protected void indexTimestampData(int docs, String indexName, String date, Strin
}
}
}""".replace("%differentiator_field_name%", differentiatorFieldName));
Response response = client().performRequest(createIndex);
Response response = client.performRequest(createIndex);
assertThat(
entityToMap(response.getEntity(), XContentType.JSON),
matchesMap().entry("shards_acknowledged", true).entry("index", indexName).entry("acknowledged", true)
Expand All @@ -291,7 +302,7 @@ protected void indexTimestampData(int docs, String indexName, String date, Strin
bulk.addParameter("refresh", "true");
bulk.addParameter("filter_path", "errors");
bulk.setJsonEntity(b.toString());
response = client().performRequest(bulk);
response = client.performRequest(bulk);
Assert.assertEquals("{\"errors\":false}", EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8));
}
}
Expand Down

0 comments on commit 9d8504f

Please sign in to comment.