-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Umbrella][Feature][Connector-V2] Support create table automaticly #3271
Comments
You can add a method using catalog. // org.apache.seatunnel.api.table.catalog.Catalog
public interface Catalog {
void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists);
} CatalogTable can represent partitioning keys, primary keys, comments, and more. But the factory class is not enabled for the entire process: #2490 |
To solve this problem, we should first list a default policy that How SeaTunnel handle with data type mapping between Source and Sink , SeaTunnel should allow users to customize them. |
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs. |
This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future. |
OverviewAs we know, the ability to automate table before writing out data is important to many users. Mapping of types is often required for automatic table creation, fortunately, this is one of our strengths, so I proposal this feature in community. DesignIn sink connector, The data type is obtained after the default void createTable(SeaTunnelRowType seaTunnelRowType) {
// do nothing
} Override and implement this method yourself for different connectors and execute it in starter module: public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws TaskExecuteException {
DataStream<Row> input = upstreamDataStreams.get(0);
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = plugins.get(i);
DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
seaTunnelSink.createTable((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
DataStreamSink<Row> dataStreamSink = stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
int parallelism = sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
}
}
// the sink is the last stream
return null;
} |
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs. |
This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future. |
Search before asking
Description
Now, many sink connectors can not create target table if table not exists. This feature is very important for many users.
Usage Scenario
No response
Related issues
No response
Are you willing to submit a PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: