Skip to content

Commit

Permalink
Migrate Kafka to use BaseConnectorTest
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Feb 23, 2022
1 parent aebdb0a commit dd0b5dc
Showing 1 changed file with 189 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.Type;
import io.trino.testing.AbstractTestIntegrationSmokeTest;
import io.trino.testing.BaseConnectorTest;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
import io.trino.testing.kafka.TestingKafka;
import io.trino.testing.sql.TestTable;
import io.trino.tpch.TpchTable;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.testng.SkipException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -53,7 +57,9 @@
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static io.trino.spi.type.VarcharType.createVarcharType;
import static io.trino.testing.DataProviders.toDataProvider;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_TABLE_WITH_DATA;
import static io.trino.testing.assertions.Assert.assertEquals;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
Expand All @@ -62,10 +68,12 @@
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertFalse;

public class TestKafkaConnectorTest
// TODO extend BaseConnectorTest
extends AbstractTestIntegrationSmokeTest
extends BaseConnectorTest
{
private TestingKafka testingKafka;
private String rawFormatTopic;
Expand All @@ -76,6 +84,15 @@ public class TestKafkaConnectorTest
private static final String JSON_MILLISECONDS_TABLE_NAME = "milliseconds_since_epoch_table";
private static final String JSON_SECONDS_TABLE_NAME = "seconds_since_epoch_table";

// These tables must not be reused because the data will be modified during tests
private static final SchemaTableName TABLE_INSERT_NEGATIVE_DATE = new SchemaTableName("write_test", "test_insert_negative_date_" + randomTableSuffix());
private static final SchemaTableName TABLE_INSERT_CUSTOMER = new SchemaTableName("write_test", "test_insert_customer_" + randomTableSuffix());
private static final SchemaTableName TABLE_INSERT_ARRAY = new SchemaTableName("write_test", "test_insert_array_" + randomTableSuffix());
private static final SchemaTableName TABLE_INSERT_UNICODE_1 = new SchemaTableName("write_test", "test_unicode_1_" + randomTableSuffix());
private static final SchemaTableName TABLE_INSERT_UNICODE_2 = new SchemaTableName("write_test", "test_unicode_2_" + randomTableSuffix());
private static final SchemaTableName TABLE_INSERT_UNICODE_3 = new SchemaTableName("write_test", "test_unicode_3_" + randomTableSuffix());
private static final SchemaTableName TABLE_INSERT_HIGHEST_UNICODE = new SchemaTableName("write_test", "test_highest_unicode_" + randomTableSuffix());

@Override
protected QueryRunner createQueryRunner()
throws Exception
Expand All @@ -102,6 +119,34 @@ protected QueryRunner createQueryRunner()
.put(new SchemaTableName("default", headersTopic),
new KafkaTopicDescription(headersTopic, Optional.empty(), headersTopic, Optional.empty(), Optional.empty()))
.putAll(createJsonDateTimeTestTopic())
.put(TABLE_INSERT_NEGATIVE_DATE, createDescription(
TABLE_INSERT_NEGATIVE_DATE,
createOneFieldDescription("key", BIGINT),
ImmutableList.of(createOneFieldDescription("dt", DATE, ISO8601.toString()))))
.put(TABLE_INSERT_CUSTOMER, createDescription(
TABLE_INSERT_CUSTOMER,
createOneFieldDescription("phone", createVarcharType(15)),
ImmutableList.of(createOneFieldDescription("custkey", BIGINT), createOneFieldDescription("acctbal", DOUBLE))))
.put(TABLE_INSERT_ARRAY, createDescription(
TABLE_INSERT_ARRAY,
createOneFieldDescription("a", new ArrayType(DOUBLE)),
ImmutableList.of(createOneFieldDescription("b", new ArrayType(DOUBLE)))))
.put(TABLE_INSERT_UNICODE_1, createDescription(
TABLE_INSERT_UNICODE_1,
createOneFieldDescription("key", BIGINT),
ImmutableList.of(createOneFieldDescription("test", createVarcharType(50)))))
.put(TABLE_INSERT_UNICODE_2, createDescription(
TABLE_INSERT_UNICODE_2,
createOneFieldDescription("key", BIGINT),
ImmutableList.of(createOneFieldDescription("test", createVarcharType(50)))))
.put(TABLE_INSERT_UNICODE_3, createDescription(
TABLE_INSERT_UNICODE_3,
createOneFieldDescription("key", BIGINT),
ImmutableList.of(createOneFieldDescription("test", createVarcharType(50)))))
.put(TABLE_INSERT_HIGHEST_UNICODE, createDescription(
TABLE_INSERT_HIGHEST_UNICODE,
createOneFieldDescription("key", BIGINT),
ImmutableList.of(createOneFieldDescription("test", createVarcharType(50)))))
.buildOrThrow();

QueryRunner queryRunner = KafkaQueryRunner.builder(testingKafka)
Expand All @@ -112,6 +157,33 @@ protected QueryRunner createQueryRunner()
return queryRunner;
}

@Override
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
switch (connectorBehavior) {
case SUPPORTS_ADD_COLUMN:
case SUPPORTS_DROP_COLUMN:
case SUPPORTS_CREATE_SCHEMA:
case SUPPORTS_CREATE_TABLE:
case SUPPORTS_CREATE_TABLE_WITH_DATA:
case SUPPORTS_DELETE:
case SUPPORTS_COMMENT_ON_TABLE:
case SUPPORTS_COMMENT_ON_COLUMN:
case SUPPORTS_RENAME_TABLE:
case SUPPORTS_RENAME_COLUMN:
case SUPPORTS_TOPN_PUSHDOWN:
return false;
default:
return super.hasBehavior(connectorBehavior);
}
}

@Override
protected TestTable createTableWithDefaultColumns()
{
throw new SkipException("Kafka connector does not support column default values");
}

@Test
public void testColumnReferencedTwice()
{
Expand Down Expand Up @@ -288,6 +360,110 @@ public void testReadAllDataTypes()
")");
}

@Test
@Override
public void testInsert()
{
// Override because the base test uses CREATE TABLE AS SELECT statement that is unsupported in Kafka connector
assertFalse(hasBehavior(SUPPORTS_CREATE_TABLE_WITH_DATA));

String query = "SELECT phone, custkey, acctbal FROM customer";

assertQuery("SELECT count(*) FROM " + TABLE_INSERT_CUSTOMER + "", "SELECT 0");

assertUpdate("INSERT INTO " + TABLE_INSERT_CUSTOMER + " " + query, "SELECT count(*) FROM customer");
assertQuery("SELECT * FROM " + TABLE_INSERT_CUSTOMER + "", query);

assertUpdate("INSERT INTO " + TABLE_INSERT_CUSTOMER + " (custkey) VALUES (-1)", 1);
assertUpdate("INSERT INTO " + TABLE_INSERT_CUSTOMER + " (custkey) VALUES (null)", 1);
assertUpdate("INSERT INTO " + TABLE_INSERT_CUSTOMER + " (phone) VALUES ('3283-2001-01-01')", 1);
assertUpdate("INSERT INTO " + TABLE_INSERT_CUSTOMER + " (custkey, phone) VALUES (-2, '3283-2001-01-02')", 1);
assertUpdate("INSERT INTO " + TABLE_INSERT_CUSTOMER + " (phone, custkey) VALUES ('3283-2001-01-03', -3)", 1);
assertUpdate("INSERT INTO " + TABLE_INSERT_CUSTOMER + " (acctbal) VALUES (1234)", 1);

assertQuery("SELECT * FROM " + TABLE_INSERT_CUSTOMER + "", query
+ " UNION ALL SELECT null, -1, null"
+ " UNION ALL SELECT null, null, null"
+ " UNION ALL SELECT '3283-2001-01-01', null, null"
+ " UNION ALL SELECT '3283-2001-01-02', -2, null"
+ " UNION ALL SELECT '3283-2001-01-03', -3, null"
+ " UNION ALL SELECT null, null, 1234");

// UNION query produces columns in the opposite order
// of how they are declared in the table schema
assertUpdate(
"INSERT INTO " + TABLE_INSERT_CUSTOMER + " (custkey, phone, acctbal) " +
"SELECT custkey, phone, acctbal FROM customer " +
"UNION ALL " +
"SELECT custkey, phone, acctbal FROM customer",
"SELECT 2 * count(*) FROM customer");
}

@Test
@Override
public void testInsertNegativeDate()
{
// Override because the base test uses CREATE TABLE statement that is unsupported in Kafka connector
assertQueryReturnsEmptyResult("SELECT dt FROM " + TABLE_INSERT_NEGATIVE_DATE);
assertUpdate(format("INSERT INTO %s (dt) VALUES (DATE '-0001-01-01')", TABLE_INSERT_NEGATIVE_DATE), 1);
assertQuery("SELECT dt FROM " + TABLE_INSERT_NEGATIVE_DATE, "VALUES date '-0001-01-01'");
assertQuery(format("SELECT dt FROM %s WHERE dt = date '-0001-01-01'", TABLE_INSERT_NEGATIVE_DATE), "VALUES date '-0001-01-01'");
}

@Test
@Override
public void testInsertArray()
{
// Override because the base test uses CREATE TABLE statement that is unsupported in Kafka connector
assertThatThrownBy(() -> query("INSERT INTO " + TABLE_INSERT_ARRAY + " (a) VALUES (ARRAY[null])"))
.hasMessage("Unsupported column type 'array(double)' for column 'a'");
throw new SkipException("not supported");
}

@Test
@Override
public void testInsertUnicode()
{
// Override because the base test uses CREATE TABLE statement that is unsupported in Kafka connector
assertUpdate("INSERT INTO " + TABLE_INSERT_UNICODE_1 + "(test) VALUES 'Hello', U&'hello\\6d4B\\8Bd5world\\7F16\\7801' ", 2);
assertThat(computeActual("SELECT test FROM " + TABLE_INSERT_UNICODE_1).getOnlyColumnAsSet())
.containsExactlyInAnyOrder("Hello", "hello测试world编码");

assertUpdate("INSERT INTO " + TABLE_INSERT_UNICODE_2 + "(test) VALUES 'aa', 'bé'", 2);
assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_2, "VALUES 'aa', 'bé'");
assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_2 + " WHERE test = 'aa'", "VALUES 'aa'");
assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_2 + " WHERE test > 'ba'", "VALUES 'bé'");
assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_2 + " WHERE test < 'ba'", "VALUES 'aa'");
assertQueryReturnsEmptyResult("SELECT test FROM " + TABLE_INSERT_UNICODE_2 + " WHERE test = 'ba'");

assertUpdate("INSERT INTO " + TABLE_INSERT_UNICODE_3 + "(test) VALUES 'a', 'é'", 2);
assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_3, "VALUES 'a', 'é'");
assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_3 + " WHERE test = 'a'", "VALUES 'a'");
assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_3 + " WHERE test > 'b'", "VALUES 'é'");
assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_3 + " WHERE test < 'b'", "VALUES 'a'");
assertQueryReturnsEmptyResult("SELECT test FROM " + TABLE_INSERT_UNICODE_3 + " WHERE test = 'b'");
}

@Test
@Override
public void testInsertHighestUnicodeCharacter()
{
// Override because the base test uses CREATE TABLE statement that is unsupported in Kafka connector
assertUpdate("INSERT INTO " + TABLE_INSERT_HIGHEST_UNICODE + "(test) VALUES 'Hello', U&'hello\\6d4B\\8Bd5\\+10FFFFworld\\7F16\\7801' ", 2);
assertThat(computeActual("SELECT test FROM " + TABLE_INSERT_HIGHEST_UNICODE).getOnlyColumnAsSet())
.containsExactlyInAnyOrder("Hello", "hello测试􏿿world编码");
}

private static KafkaTopicDescription createDescription(SchemaTableName schemaTableName, KafkaTopicFieldDescription key, List<KafkaTopicFieldDescription> fields)
{
return new KafkaTopicDescription(
schemaTableName.getTableName(),
Optional.of(schemaTableName.getSchemaName()),
schemaTableName.getTableName(),
Optional.of(new KafkaTopicFieldGroup("json", Optional.empty(), Optional.empty(), ImmutableList.of(key))),
Optional.of(new KafkaTopicFieldGroup("json", Optional.empty(), Optional.empty(), fields)));
}

private static KafkaTopicDescription createDescription(String name, String schema, String topic, Optional<KafkaTopicFieldGroup> message)
{
return new KafkaTopicDescription(name, Optional.of(schema), topic, Optional.empty(), message);
Expand All @@ -298,6 +474,16 @@ private static Optional<KafkaTopicFieldGroup> createFieldGroup(String dataFormat
return Optional.of(new KafkaTopicFieldGroup(dataFormat, Optional.empty(), Optional.empty(), fields));
}

private static KafkaTopicFieldDescription createOneFieldDescription(String name, Type type)
{
return new KafkaTopicFieldDescription(name, type, name, null, null, null, false);
}

private static KafkaTopicFieldDescription createOneFieldDescription(String name, Type type, String dataFormat)
{
return new KafkaTopicFieldDescription(name, type, name, null, dataFormat, null, false);
}

private static KafkaTopicFieldDescription createOneFieldDescription(String name, Type type, String dataFormat, Optional<String> formatHint)
{
return formatHint.map(s -> new KafkaTopicFieldDescription(name, type, name, null, dataFormat, s, false))
Expand Down

0 comments on commit dd0b5dc

Please sign in to comment.