Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2][Jdbc] Add oceanbase dialect factory #4989

Merged
merged 32 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
aa129ef
[Feature][Connector-V2][OceanBase]OceanBase source and sink connector
silenceland Jan 15, 2023
49cd2b2
Merge branch 'dev' into feature/connector-V2-oceanbase
silenceland Jan 15, 2023
1dba48a
Merge branch 'dev' into feature/connector-V2-oceanbase
silenceland Mar 19, 2023
51b0901
[Feature][Connector-V2][OceanBase]mvn spotless:apply to fix violations
silenceland Mar 19, 2023
0830d39
Merge branch 'dev' into feature-connector-v2-oceanbase
silenceland Mar 25, 2023
273a9da
[Feature][Connector-V2][OceanBase]add oceanbase conf
silenceland Apr 16, 2023
a0c5c19
Merge branch 'dev' into feature-connector-v2-oceanbase
silenceland Apr 16, 2023
a18844f
[Feature][Connector-V2][OceanBase]fix column float or double
silenceland Apr 17, 2023
7e881a3
[Feature][Jdbc-Connector] Add OceanBase Connector
changhuyan Apr 20, 2023
6942728
[Feature][Jdbc-Connector]Add Oceanbase Connector
changhuyan Apr 20, 2023
fc2b53a
Merge branch 'dev' into OceanBase-Connector
changhuyan Apr 20, 2023
4feacd7
[Feature][Jdbc-Connector]
changhuyan Apr 21, 2023
c2b59a7
[Feature][Jdbc-Connector] Add java.util.*
changhuyan Apr 21, 2023
f007a2b
changhuyan [Feature][Jdbc-Connector] Modify ci and code style error
changhuyan Apr 23, 2023
e9c4289
[Feature][Jdbc-Connector] Add e2e testcase
changhuyan Apr 24, 2023
cd2171e
[Feature][Jdbc-Connector] Add e2etest License
changhuyan Apr 25, 2023
18a5772
Merge branch 'dev' into feature-connector-v2-oceanbase
silenceland Jun 7, 2023
9c11006
Merge branch 'dev' into feature-connector-v2-oceanbase
silenceland Jun 18, 2023
adcd203
[Feature][Connector-V2][OceanBase]spotless apply
silenceland Jun 18, 2023
d4b5952
fix conflicts
whhe Jun 28, 2023
6c930b6
Merge branch 'dev' into ob-dialect
whhe Jun 28, 2023
b23df0f
merge dialect factory class and revert unnecessary changes
whhe Jun 28, 2023
3ee6054
rename IT files
whhe Jun 28, 2023
3d8dd7f
update IT cases based on existing mysql/oracle cases
whhe Jun 28, 2023
7c70f04
add ob in docs
whhe Jun 28, 2023
168b1b3
disable obmysql IT case
whhe Jun 29, 2023
558e41b
use compatible_mode instead of driver_type and add ob doc
whhe Jul 3, 2023
50b1c3b
Merge branch 'dev' into ob-dialect
whhe Jul 7, 2023
a19e2a2
comments addressed
whhe Jul 7, 2023
6a4ae96
add compare result in ut
whhe Jul 7, 2023
465d9e5
set pulsar container timeout to 3 min
whhe Jul 7, 2023
854e9a9
fix comments
whhe Jul 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
| user | String | No | - |
| password | String | No | - |
| query | String | No | - |
| driver_type | String | No | - |
whhe marked this conversation as resolved.
Show resolved Hide resolved
| database | String | No | - |
| table | String | No | - |
| primary_keys | Array | No | - |
Expand Down Expand Up @@ -69,6 +70,10 @@ The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost/tes

Use this sql write upstream input datas to database. e.g `INSERT ...`

### driver_type [string]

Use this field to represent the OceanBase driver. e.g 'mysql'

### database [string]

Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
Expand Down Expand Up @@ -168,6 +173,7 @@ there are some reference value for params above.
| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | com.amazon.redshift.xa.RedshiftXADataSource | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 |
| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://<account_name>.snowflakecomputing.com | / | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc |
| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar |
| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar |

## Example

Expand Down
6 changes: 6 additions & 0 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ supports query SQL and can achieve projection effect.
| user | String | No | - |
| password | String | No | - |
| query | String | Yes | - |
| driver_type | String | Yes | - |
| connection_check_timeout_sec | Int | No | 30 |
| partition_column | String | No | - |
| partition_upper_bound | Long | No | - |
Expand Down Expand Up @@ -63,6 +64,10 @@ The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost/tes

Query statement

### driver_type [string]

Use this field to represent the OceanBase driver. e.g 'mysql'

### connection_check_timeout_sec [int]

The time in seconds to wait for the database operation used to validate the connection to complete.
Expand Down Expand Up @@ -120,6 +125,7 @@ there are some reference value for params above.
| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://<account_name>.snowflakecomputing.com | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc |
| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb?defaultRowFetchSize=1000 | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 |
| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar |
| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar |

## Example

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class JdbcConnectionConfig implements Serializable {

public String url;
public String driverName;
public String driverType;
public int connectionCheckTimeoutSeconds =
JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue();
public int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue();
Expand All @@ -48,6 +49,7 @@ public class JdbcConnectionConfig implements Serializable {
public static JdbcConnectionConfig of(ReadonlyConfig config) {
JdbcConnectionConfig.Builder builder = JdbcConnectionConfig.builder();
builder.url(config.get(JdbcOptions.URL));
builder.driverType(config.get(JdbcOptions.DRIVER_TYPE));
builder.driverName(config.get(JdbcOptions.DRIVER));
builder.autoCommit(config.get(JdbcOptions.AUTO_COMMIT));
builder.maxRetries(config.get(JdbcOptions.MAX_RETRIES));
Expand All @@ -73,6 +75,10 @@ public String getDriverName() {
return driverName;
}

public String getDriverType() {
return driverType;
}

public boolean isAutoCommit() {
return autoCommit;
}
Expand Down Expand Up @@ -120,6 +126,7 @@ public static JdbcConnectionConfig.Builder builder() {
public static final class Builder {
private String url;
private String driverName;
private String driverType;
private int connectionCheckTimeoutSeconds =
JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue();
private int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue();
Expand All @@ -145,6 +152,11 @@ public Builder driverName(String driverName) {
return this;
}

public Builder driverType(String driverType) {
this.driverType = driverType;
return this;
}

public Builder connectionCheckTimeoutSeconds(int connectionCheckTimeoutSeconds) {
this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
return this;
Expand Down Expand Up @@ -205,6 +217,7 @@ public JdbcConnectionConfig build() {
jdbcConnectionConfig.batchSize = this.batchSize;
jdbcConnectionConfig.batchIntervalMs = this.batchIntervalMs;
jdbcConnectionConfig.driverName = this.driverName;
jdbcConnectionConfig.driverType = this.driverType;
jdbcConnectionConfig.maxRetries = this.maxRetries;
jdbcConnectionConfig.password = this.password;
jdbcConnectionConfig.connectionCheckTimeoutSeconds = this.connectionCheckTimeoutSeconds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public interface JdbcOptions {
.intType()
.defaultValue(30)
.withDescription("connection check time second");
Option<String> DRIVER_TYPE =
Options.key("driver_type").stringType().noDefaultValue().withDescription("driver_type");

Option<Integer> MAX_RETRIES =
Options.key("max_retries").intType().defaultValue(0).withDescription("max_retired");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class JdbcSourceConfig implements Serializable {

private JdbcConnectionConfig jdbcConnectionConfig;
public String query;
public String driverType;
private String partitionColumn;
private BigDecimal partitionUpperBound;
private BigDecimal partitionLowerBound;
Expand All @@ -44,6 +45,7 @@ public static JdbcSourceConfig of(ReadonlyConfig config) {
builder.jdbcConnectionConfig(JdbcConnectionConfig.of(config));
builder.query(config.get(JdbcOptions.QUERY));
builder.fetchSize(config.get(JdbcOptions.FETCH_SIZE));
config.getOptional(JdbcOptions.DRIVER_TYPE).ifPresent(builder::driverType);
config.getOptional(JdbcOptions.PARTITION_COLUMN).ifPresent(builder::partitionColumn);
config.getOptional(JdbcOptions.PARTITION_UPPER_BOUND)
.ifPresent(builder::partitionUpperBound);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,14 @@ public interface JdbcDialectFactory {

/** @return Creates a new instance of the {@link JdbcDialect}. */
JdbcDialect create();

/**
* Create a {@link JdbcDialect} instance based on the driver type.
whhe marked this conversation as resolved.
Show resolved Hide resolved
*
* @param driverType The driver type
* @return a new instance of {@link JdbcDialect}
*/
default JdbcDialect create(String driverType) {
return create();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ private JdbcDialectLoader() {}
* Loads the unique JDBC Dialect that can handle the given database url.
*
* @param url A database URL.
* @param driverType The driver type.
* @throws IllegalStateException if the loader cannot find exactly one dialect that can
* unambiguously process the given database URL.
* @return The loaded dialect.
*/
public static JdbcDialect load(String url) {
public static JdbcDialect load(String url, String driverType) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
List<JdbcDialectFactory> foundFactories = discoverFactories(cl);

Expand Down Expand Up @@ -89,7 +90,7 @@ public static JdbcDialect load(String url) {
.collect(Collectors.joining("\n"))));
}

return matchingFactories.get(0).create();
return matchingFactories.get(0).create(driverType);
}

private static List<JdbcDialectFactory> discoverFactories(ClassLoader classLoader) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;

import com.google.auto.service.AutoService;

import javax.annotation.Nonnull;

@AutoService(JdbcDialectFactory.class)
public class OceanBaseDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:oceanbase:");
}

@Override
public JdbcDialect create() {
throw new UnsupportedOperationException(
"Can't create JdbcDialect without driver type for OceanBase");
}

@Override
public JdbcDialect create(@Nonnull String driverType) {
if ("oracle".equalsIgnoreCase(driverType)) {
return new OracleDialect();
}
return new MysqlDialect();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ public String getPluginName() {
public void prepare(Config pluginConfig) throws PrepareFailException {
this.config = ReadonlyConfig.fromConfig(pluginConfig);
this.jdbcSinkConfig = JdbcSinkConfig.of(config);
this.dialect = JdbcDialectLoader.load(jdbcSinkConfig.getJdbcConnectionConfig().getUrl());
this.dialect =
JdbcDialectLoader.load(
jdbcSinkConfig.getJdbcConnectionConfig().getUrl(),
jdbcSinkConfig.getJdbcConnectionConfig().getDriverType());
this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ public TableSink createSink(TableFactoryContext context) {
}
final ReadonlyConfig options = config;
JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config);
JdbcDialect dialect = JdbcDialectLoader.load(sinkConfig.getJdbcConnectionConfig().getUrl());
JdbcDialect dialect =
JdbcDialectLoader.load(
sinkConfig.getJdbcConnectionConfig().getUrl(),
sinkConfig.getJdbcConnectionConfig().getDriverType());
return () ->
new JdbcSink(
options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
new SimpleJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig());
this.query = jdbcSourceConfig.getQuery();
this.jdbcDialect =
JdbcDialectLoader.load(jdbcSourceConfig.getJdbcConnectionConfig().getUrl());
JdbcDialectLoader.load(
jdbcSourceConfig.getJdbcConnectionConfig().getUrl(),
jdbcSourceConfig.getJdbcConnectionConfig().getDriverType());
try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection()) {
this.typeInfo = initTableField(connection);
this.partitionParameter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ TableSource<T, SplitT, StateT> createSource(TableFactoryContext context) {
JdbcConnectionProvider connectionProvider =
new SimpleJdbcConnectionProvider(config.getJdbcConnectionConfig());
final String querySql = config.getQuery();
JdbcDialect dialect = JdbcDialectLoader.load(config.getJdbcConnectionConfig().getUrl());
JdbcDialect dialect =
whhe marked this conversation as resolved.
Show resolved Hide resolved
JdbcDialectLoader.load(
config.getJdbcConnectionConfig().getUrl(),
config.getJdbcConnectionConfig().getDriverType());
TableSchema tableSchema = catalogTable.getTableSchema();
SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
Optional<PartitionParameter> partitionParameter =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

import org.apache.commons.lang3.tuple.Pair;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public abstract class JdbcOceanBaseITBase extends AbstractJdbcIT {

private static final String OCEANBASE_DATABASE = "seatunnel";
private static final String OCEANBASE_SOURCE = "source";
private static final String OCEANBASE_SINK = "sink";

private static final String OCEANBASE_JDBC_TEMPLATE = "jdbc:oceanbase://" + HOST + ":%s";
private static final String OCEANBASE_DRIVER_CLASS = "com.oceanbase.jdbc.Driver";

abstract String imageName();

abstract String host();

abstract int port();

abstract String username();

abstract String password();

abstract List<String> configFile();

abstract String createSqlTemplate();

@Override
JdbcCase getJdbcCase() {
Map<String, String> containerEnv = new HashMap<>();
String jdbcUrl = String.format(OCEANBASE_JDBC_TEMPLATE, port());
Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
String[] fieldNames = testDataSet.getKey();

String insertSql = insertTable(OCEANBASE_DATABASE, OCEANBASE_SOURCE, fieldNames);

return JdbcCase.builder()
.dockerImage(imageName())
.networkAliases(host())
.containerEnv(containerEnv)
.driverClass(OCEANBASE_DRIVER_CLASS)
.host(HOST)
.port(port())
.localPort(port())
.jdbcTemplate(OCEANBASE_JDBC_TEMPLATE)
.jdbcUrl(jdbcUrl)
.userName(username())
.password(password())
.database(OCEANBASE_DATABASE)
.sourceTable(OCEANBASE_SOURCE)
.sinkTable(OCEANBASE_SINK)
.createSql(createSqlTemplate())
.configFile(configFile())
.insertSql(insertSql)
.testData(testDataSet)
.build();
}

@Override
void compareResult() {}

@Override
String driverUrl() {
return "https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar";
}

@Override
protected void createSchemaIfNeeded() {
String sql = "CREATE DATABASE IF NOT EXISTS " + OCEANBASE_DATABASE;
try {
connection.prepareStatement(sql).executeUpdate();
} catch (Exception e) {
throw new SeaTunnelRuntimeException(
JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql " + sql, e);
}
}
}
Loading