Skip to content

Commit

Permalink
Migrate Elasticsearch tests to testcontainers
Browse files Browse the repository at this point in the history
Cherry-pick of
trinodb/trino@bd4b3dd

Co-authored-by: Martin Traverso <mtraverso@gmail.com>
  • Loading branch information
2 people authored and zhenxiao committed Dec 20, 2022
1 parent 03938b4 commit 881a616
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 496 deletions.
45 changes: 39 additions & 6 deletions presto-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,6 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${dep.elasticsearch.version}</version>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
Expand Down Expand Up @@ -249,6 +243,33 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-client</artifactId>
Expand Down Expand Up @@ -327,6 +348,18 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${dep.elasticsearch.version}</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>jna</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import com.facebook.presto.tests.ResultsSession;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
Expand All @@ -42,25 +43,24 @@
import static com.facebook.presto.common.type.Varchars.isVarcharType;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import static org.elasticsearch.client.Requests.flushRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

public class ElasticsearchLoader
extends AbstractTestingPrestoClient<Void>
{
private final String tableName;
private final Client client;
RestHighLevelClient restClient;

public ElasticsearchLoader(
Client client,
RestHighLevelClient client,
String tableName,
TestingPrestoServer prestoServer,
Session defaultSession)
{
super(prestoServer, defaultSession);

this.tableName = requireNonNull(tableName, "tableName is null");
this.client = requireNonNull(client, "client is null");
this.restClient = requireNonNull(client, "client is null");
}

@Override
Expand Down Expand Up @@ -108,8 +108,13 @@ public void addResults(QueryStatusInfo statusInfo, QueryData data)
throw new UncheckedIOException("Error loading data into Elasticsearch index: " + tableName, e);
}
}
client.bulk(request).actionGet();
client.admin().indices().flush(flushRequest(tableName)).actionGet();
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try {
restClient.bulk(request);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
import com.facebook.presto.tests.TestingPrestoClient;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import io.airlift.tpch.TpchTable;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;

import java.util.Map;

Expand All @@ -42,9 +46,10 @@ private ElasticsearchQueryRunner() {}
private static final String TPCH_SCHEMA = "tpch";
private static final int NODE_COUNT = 2;

public static DistributedQueryRunner createElasticsearchQueryRunner(EmbeddedElasticsearchNode embeddedElasticsearchNode, Iterable<TpchTable<?>> tables)
public static DistributedQueryRunner createElasticsearchQueryRunner(HostAndPort address, Iterable<TpchTable<?>> tables)
throws Exception
{
RestHighLevelClient client = null;
DistributedQueryRunner queryRunner = null;
try {
queryRunner = DistributedQueryRunner.builder(createSession())
Expand All @@ -54,36 +59,40 @@ public static DistributedQueryRunner createElasticsearchQueryRunner(EmbeddedElas
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

embeddedElasticsearchNode.start();

TestingElasticsearchConnectorFactory testFactory = new TestingElasticsearchConnectorFactory();

installElasticsearchPlugin(queryRunner, testFactory);
installElasticsearchPlugin(address, queryRunner, testFactory);

TestingPrestoClient prestoClient = queryRunner.getRandomClient();

LOG.info("Loading data...");

client = new RestHighLevelClient(RestClient.builder(HttpHost.create(address.toString())));

long startTime = System.nanoTime();
for (TpchTable<?> table : tables) {
loadTpchTopic(embeddedElasticsearchNode, prestoClient, table);
loadTpchTopic(client, prestoClient, table);
}
LOG.info("Loading complete in %s", nanosSince(startTime).toString(SECONDS));

return queryRunner;
}
catch (Exception e) {
closeAllSuppress(e, queryRunner, embeddedElasticsearchNode);
closeAllSuppress(e, queryRunner, client);
throw e;
}
}

private static void installElasticsearchPlugin(QueryRunner queryRunner, TestingElasticsearchConnectorFactory factory)
private static void installElasticsearchPlugin(HostAndPort address, QueryRunner queryRunner, TestingElasticsearchConnectorFactory factory)
throws Exception
{
queryRunner.installPlugin(new ElasticsearchPlugin(factory));
Map<String, String> config = ImmutableMap.<String, String>builder()
.put("elasticsearch.host", "localhost")
.put("elasticsearch.port", "9200")
.put("elasticsearch.host", address.getHost())
.put("elasticsearch.port", Integer.toString(address.getPort()))
// Node discovery relies on the publish_address exposed via the Elasticseach API
// This doesn't work well within a docker environment that maps ES's port to a random public port
.put("elasticsearch.ignore-publish-address", "true")
.put("elasticsearch.default-schema-name", TPCH_SCHEMA)
.put("elasticsearch.scroll-size", "1000")
.put("elasticsearch.scroll-timeout", "1m")
Expand All @@ -94,11 +103,11 @@ private static void installElasticsearchPlugin(QueryRunner queryRunner, TestingE
queryRunner.createCatalog("elasticsearch", "elasticsearch", config);
}

private static void loadTpchTopic(EmbeddedElasticsearchNode embeddedElasticsearchNode, TestingPrestoClient prestoClient, TpchTable<?> table)
private static void loadTpchTopic(RestHighLevelClient client, TestingPrestoClient prestoClient, TpchTable<?> table)
{
long start = System.nanoTime();
LOG.info("Running import for %s", table.getTableName());
ElasticsearchLoader loader = new ElasticsearchLoader(embeddedElasticsearchNode.getClient(), table.getTableName().toLowerCase(ENGLISH), prestoClient.getServer(), prestoClient.getDefaultSession());
ElasticsearchLoader loader = new ElasticsearchLoader(client, table.getTableName().toLowerCase(ENGLISH), prestoClient.getServer(), prestoClient.getDefaultSession());
loader.execute(format("SELECT * from %s", new QualifiedObjectName(TPCH_SCHEMA, TINY_SCHEMA_NAME, table.getTableName().toLowerCase(ENGLISH))));
LOG.info("Imported %s in %s", table.getTableName(), nanosSince(start).convertToMostSuccinctTimeUnit());
}
Expand All @@ -112,8 +121,8 @@ public static void main(String[] args)
throws Exception
{
Logging.initialize();
DistributedQueryRunner queryRunner = createElasticsearchQueryRunner(EmbeddedElasticsearchNode.createEmbeddedElasticsearchNode(), TpchTable.getTables());
Thread.sleep(10);
HostAndPort address = HostAndPort.fromParts("localhost", 9200);
DistributedQueryRunner queryRunner = createElasticsearchQueryRunner(address, TpchTable.getTables());
Logger log = Logger.get(ElasticsearchQueryRunner.class);
log.info("======== SERVER STARTED ========");
log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.elasticsearch;

import com.google.common.net.HostAndPort;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class ElasticsearchServer
{
private final ElasticsearchContainer container;

public ElasticsearchServer()
{
container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch-oss:6.0.0");
container.start();
}

public void stop()
{
container.close();
}

public HostAndPort getAddress()
{
return HostAndPort.fromString(container.getHttpHostAddress());
}
}

This file was deleted.

Loading

0 comments on commit 881a616

Please sign in to comment.