-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature][connector-v2] add tablestore source and sink #3309
Merged
Merged
Changes from 21 commits
Commits
Show all changes
33 commits
Select commit
Hold shift + click to select a range
e94a14d
add tablestore
liugddx d036fc6
fix unable to release link problem
liugddx 77c9ad4
remove schema
liugddx 77dd396
add doc
liugddx cbf1a6d
fix convert error
liugddx 29c6897
fix deadlink error
liugddx 6e608a0
fix code style error
liugddx 53b5abb
fix code style error
liugddx 8359747
revert table sink
liugddx 0785f3b
fix deadlink
liugddx 0de05f5
add tablestore sink
liugddx 7a0593f
Merge remote-tracking branch 'upstream/dev' into feature-tablestore
liugddx 72f75d4
fix type error
liugddx 51a56a7
add tablestore sink
liugddx b736f23
fix tablestore sink bug
liugddx 448a619
fix tablestore sink bug
liugddx a86225b
Merge remote-tracking branch 'upstream/dev' into feature-tablestore
liugddx f594236
Merge remote-tracking branch 'upstream/dev' into feature-tablestore
liugddx 40d2018
fix cr error
liugddx 5896145
fix cr error
liugddx d5a839a
fix cr error
liugddx 7e84bd7
add optional params
liugddx adfc22e
Merge remote-tracking branch 'upstream/dev' into feature-tablestore
liugddx 5df8159
fix cr error
liugddx de19951
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx 1be80a9
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx 3e5c3eb
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx f2e4c6b
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx c2883a5
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx 557fed0
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx 6ff5e15
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx d76fa8a
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx ec0786c
fix cr error
liugddx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
# Tablestore | ||
|
||
> Tablestore sink connector | ||
|
||
## Description | ||
|
||
Write data to `Tablestore` | ||
|
||
## Key features | ||
|
||
- [ ] [exactly-once](../../concept/connector-v2-features.md) | ||
- [ ] [schema projection](../../concept/connector-v2-features.md) | ||
|
||
## Options | ||
|
||
| name | type | required | default value | | ||
|----------------- | ------ |----------| ------------- | | ||
| end_point | string | yes | - | | ||
| instance_name | string | yes | - | | ||
| access_key_id | string | yes | - | | ||
| access_key_secret| string | yes | - | | ||
| table | string | yes | - | | ||
| primary_keys | array | yes | - | | ||
| batch_size | string | no | 25 | | ||
| batch_interval_ms| string | no | 1000 | | ||
| common-options | config | no | - | | ||
|
||
### end_point [string] | ||
|
||
endPoint to write to Tablestore. | ||
|
||
### instanceName [string] | ||
|
||
The instanceName of Tablestore. | ||
|
||
### access_key_id [string] | ||
|
||
The access id of Tablestore. | ||
|
||
### access_key_secret [string] | ||
|
||
The access secret of Tablestore. | ||
|
||
### table [string] | ||
|
||
The table of Tablestore. | ||
|
||
### primaryKeys [array] | ||
|
||
The primaryKeys of Tablestore. | ||
|
||
### common options [ config ] | ||
|
||
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. | ||
|
||
## Example | ||
|
||
```bash | ||
Tablestore { | ||
end_point = "xxxx" | ||
instance_name = "xxxx" | ||
access_key_id = "xxxx" | ||
access_key_secret = "xxxx" | ||
table = "sink" | ||
primary_keys = ["pk_1","pk_2","pk_3","pk_4"] | ||
} | ||
``` | ||
|
||
## Changelog | ||
|
||
### next version | ||
|
||
- Add Tablestore Sink Connector | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
50 changes: 50 additions & 0 deletions
50
...he/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore; | ||
|
||
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; | ||
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; | ||
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; | ||
|
||
import java.sql.Connection; | ||
import java.sql.PreparedStatement; | ||
import java.sql.SQLException; | ||
|
||
public class TablestoreDialect implements JdbcDialect { | ||
@Override | ||
public String dialectName() { | ||
return "Tablestore"; | ||
} | ||
|
||
@Override | ||
public JdbcRowConverter getRowConverter() { | ||
return new TablestoreJdbcRowConverter(); | ||
} | ||
|
||
@Override | ||
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { | ||
return new TablestoreTypeMapper(); | ||
} | ||
|
||
@Override | ||
public PreparedStatement creatPreparedStatement(Connection connection, String queryTemplate, int fetchSize) throws SQLException { | ||
PreparedStatement statement = connection.prepareStatement(queryTemplate); | ||
statement.setFetchSize(fetchSize); | ||
return statement; | ||
} | ||
} |
40 changes: 40 additions & 0 deletions
40
...unnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialectFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore; | ||
|
||
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; | ||
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; | ||
|
||
import com.google.auto.service.AutoService; | ||
|
||
/** | ||
* Factory for {@link TablestoreDialect}. | ||
*/ | ||
|
||
@AutoService(JdbcDialectFactory.class) | ||
public class TablestoreDialectFactory implements JdbcDialectFactory { | ||
@Override | ||
public boolean acceptsURL(String url) { | ||
return url.startsWith("jdbc:ots:https:"); | ||
} | ||
|
||
@Override | ||
public JdbcDialect create() { | ||
return new TablestoreDialect(); | ||
} | ||
} |
39 changes: 39 additions & 0 deletions
39
...nel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreJdbcRowConverter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore; | ||
|
||
import org.apache.seatunnel.api.table.type.SeaTunnelRow; | ||
import org.apache.seatunnel.api.table.type.SeaTunnelRowType; | ||
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; | ||
|
||
import java.sql.ResultSet; | ||
import java.sql.ResultSetMetaData; | ||
import java.sql.SQLException; | ||
|
||
public class TablestoreJdbcRowConverter extends AbstractJdbcRowConverter { | ||
|
||
@Override | ||
public String converterName() { | ||
return "Tablestore"; | ||
} | ||
|
||
@Override | ||
public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException { | ||
return super.toInternal(rs, metaData, typeInfo); | ||
} | ||
} |
78 changes: 78 additions & 0 deletions
78
...seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore; | ||
|
||
import org.apache.seatunnel.api.table.type.BasicType; | ||
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; | ||
import org.apache.seatunnel.api.table.type.SeaTunnelDataType; | ||
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import java.sql.ResultSetMetaData; | ||
import java.sql.SQLException; | ||
|
||
@Slf4j | ||
public class TablestoreTypeMapper implements JdbcDialectTypeMapper { | ||
|
||
|
||
// ============================data types===================== | ||
|
||
private static final String TABLESTORE_UNKNOWN = "UNKNOWN"; | ||
|
||
private static final String TABLESTORE_BOOL = "BOOL"; | ||
|
||
// -------------------------number---------------------------- | ||
private static final String TABLESTORE_BIGINT = "BIGINT"; | ||
private static final String TABLESTORE_DOUBLE = "DOUBLE"; | ||
// -------------------------string---------------------------- | ||
private static final String TABLESTORE_VARCHAR = "VARCHAR"; | ||
private static final String TABLESTORE_MEDIUMTEXT = "MEDIUMTEXT"; | ||
|
||
// ------------------------------blob------------------------- | ||
private static final String TABLESTORE_VARBINARY = "VARBINARY"; | ||
private static final String TABLESTORE_MEDIUMBLOB = "MEDIUMBLOB"; | ||
|
||
@SuppressWarnings("checkstyle:MagicNumber") | ||
@Override | ||
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException { | ||
String tablestoreServerType = metadata.getColumnTypeName(colIndex).toUpperCase(); | ||
switch (tablestoreServerType) { | ||
case TABLESTORE_BOOL: | ||
return BasicType.BOOLEAN_TYPE; | ||
case TABLESTORE_BIGINT: | ||
return BasicType.LONG_TYPE; | ||
case TABLESTORE_DOUBLE: | ||
return BasicType.DOUBLE_TYPE; | ||
case TABLESTORE_VARCHAR: | ||
case TABLESTORE_MEDIUMTEXT: | ||
return BasicType.STRING_TYPE; | ||
case TABLESTORE_VARBINARY: | ||
case TABLESTORE_MEDIUMBLOB: | ||
return PrimitiveByteArrayType.INSTANCE; | ||
//Doesn't support yet | ||
case TABLESTORE_UNKNOWN: | ||
default: | ||
final String jdbcColumnName = metadata.getColumnName(colIndex); | ||
throw new UnsupportedOperationException( | ||
String.format( | ||
"Doesn't support TABLESTORE type '%s' on column '%s' yet.", | ||
tablestoreServerType, jdbcColumnName)); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why need add executeQuery
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executeQuery has a great impact on the performance of other databases, it is recommended to use JdbcDialect to achieve