Skip to content

Commit

Permalink
Supports iceberg sink apache#6198
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jan 22, 2024
1 parent e10e90b commit 24b2f9c
Show file tree
Hide file tree
Showing 14 changed files with 106 additions and 60 deletions.
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/Iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
## Description

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<hadoop.binary.version>2.7</hadoop.binary.version>
<jackson.version>2.13.3</jackson.version>
<lombok.version>1.18.24</lombok.version>
<commons-compress.version>1.24.0</commons-compress.version>
<commons-compress.version>1.20</commons-compress.version>
<avro.version>1.11.1</avro.version>
<skip.pmd.check>false</skip.pmd.check>
<maven.deploy.skip>false</maven.deploy.skip>
Expand Down
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@
- [Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
- [Transform-V2] Add catalog support for SQL Transform plugin (#4819)
- [Connector-V2] [Assert] Support check the precision and scale of Decimal type (#6110)
- [Connector-V2] [Iceberg] Support iceberg sink #6198

### Zeta(ST-Engine)

Expand Down
14 changes: 14 additions & 0 deletions seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,18 @@
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<!--The lower version is no longer compatible with Apple M1-->
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.5.5-5</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand All @@ -84,6 +94,10 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
Expand Down
52 changes: 52 additions & 0 deletions seatunnel-connectors-v2/connector-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,19 @@
<parquet-avro.version>1.13.1</parquet-avro.version>
<avro.version>1.11.3</avro.version>
<hive.version>2.3.9</hive.version>
<connector.name>connector-iceberg</connector.name>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.5.5-5</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
Expand All @@ -54,6 +65,7 @@
<artifactId>iceberg-common</artifactId>
<version>${iceberg.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
Expand Down Expand Up @@ -200,4 +212,44 @@

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<relocations>
<relocation>
<pattern>org.apache.avro</pattern>
<!--suppress UnresolvedMavenProperty, this property is added by submodule-->
<shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.avro</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.orc</pattern>
<shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.orc</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.parquet</pattern>
<!--suppress UnresolvedMavenProperty -->
<shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.parquet</shadedPattern>
</relocation>
<relocation>
<pattern>shaded.parquet</pattern>
<!--suppress UnresolvedMavenProperty -->
<shadedPattern>${seatunnel.shade.package}.${connector.name}.shaded.parquet</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynMethods;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -38,11 +37,10 @@
import java.nio.file.Paths;
import java.util.List;

@Slf4j
public class IcebergCatalogFactory implements Serializable {

private static final long serialVersionUID = -6003040601422350869L;
private static final Logger LOG =
LoggerFactory.getLogger(IcebergCatalogFactory.class.getName());
private static final List<String> HADOOP_CONF_FILES =
ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
private CommonConfig config;
Expand All @@ -52,6 +50,8 @@ public IcebergCatalogFactory(CommonConfig config) {
}

public Catalog loadCatalog() {
// When using the seatunel engine, set the current class loader to prevent loading failures
Thread.currentThread().setContextClassLoader(IcebergCatalogFactory.class.getClassLoader());
return CatalogUtil.buildIcebergCatalog(
config.getCatalogName(), config.getCatalogProps(), loadHadoopConfig(config));
}
Expand All @@ -77,7 +77,7 @@ private Object loadHadoopConfig(CommonConfig config) {
}

if (configClass == null) {
LOG.info("Hadoop not found on classpath, not creating Hadoop config");
log.info("Hadoop not found on classpath, not creating Hadoop config");
return null;
}

Expand All @@ -100,7 +100,7 @@ private Object loadHadoopConfig(CommonConfig config) {
try {
addResourceMethod.invoke(path.toUri().toURL());
} catch (IOException e) {
LOG.warn(
log.warn(
"Error adding Hadoop resource {}, resource was not added",
path,
e);
Expand All @@ -109,13 +109,13 @@ private Object loadHadoopConfig(CommonConfig config) {
});
}
config.getHadoopProps().forEach(setMethod::invoke);
LOG.info("Hadoop config initialized: {}", configClass.getName());
log.info("Hadoop config initialized: {}", configClass.getName());
return result;
} catch (InstantiationException
| IllegalAccessException
| NoSuchMethodException
| InvocationTargetException e) {
LOG.warn(
log.warn(
"Hadoop found on classpath but could not create config, proceeding without config",
e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;

import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
Expand All @@ -36,7 +37,7 @@ public class IcebergTableLoader implements Closeable, Serializable {

private final IcebergCatalogFactory icebergCatalogFactory;
private final String tableIdentifierStr;
private Catalog catalog;
private transient Catalog catalog;

public IcebergTableLoader(
@NonNull IcebergCatalogFactory icebergCatalogFactory,
Expand All @@ -54,12 +55,15 @@ public TableIdentifier getTableIdentifier() {
}

public IcebergTableLoader open() {
catalog = icebergCatalogFactory.loadCatalog();
catalog = CachingCatalog.wrap(icebergCatalogFactory.loadCatalog());
return this;
}

public Table loadTable() {
TableIdentifier tableIdentifier = TableIdentifier.parse(tableIdentifierStr);
if (catalog == null) {
open();
}
return catalog.loadTable(tableIdentifier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/** Iceberg aggregated committer */
@Slf4j
public class IcebergAggregatedCommitter
implements SinkAggregatedCommitter<IcebergCommitInfo, IcebergAggregatedCommitInfo> {

private transient IcebergFilesCommitter filesCommitter;
private IcebergTableLoader tableLoader;
private final IcebergFilesCommitter filesCommitter;

public IcebergAggregatedCommitter(SinkConfig config) {
IcebergTableLoader tableLoader = IcebergTableLoader.create(config).open();
Expand All @@ -45,7 +45,7 @@ public List<IcebergAggregatedCommitInfo> commit(
for (IcebergAggregatedCommitInfo commitInfo : aggregatedCommitInfo) {
commitFiles(commitInfo.commitInfos);
}
return null;
return Collections.emptyList();
}

private void commitFiles(List<IcebergCommitInfo> commitInfos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ protected List<Object> convertListValue(
SeaTunnelDataType fromType,
Types.ListType type,
SchemaChangeWrapper wrapper) {
;
Preconditions.checkArgument(value.getClass().isArray());
Object[] list = (Object[]) value;
return Arrays.stream(list)
Expand Down
9 changes: 0 additions & 9 deletions seatunnel-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@
<hive.jdbc.version>3.1.3</hive.jdbc.version>
<aliyun.sdk.oss.version>3.4.1</aliyun.sdk.oss.version>
<jdom.version>1.1</jdom.version>

<zstd-jni.version>1.5.5-5</zstd-jni.version>
</properties>
<dependencies>
<!-- starters -->
Expand Down Expand Up @@ -680,13 +678,6 @@
<version>${hadoop-aliyun.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
<scope>provided</scope>
</dependency>
<!-- hadoop jar end -->
</dependencies>
<repositories>
Expand Down
4 changes: 0 additions & 4 deletions seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,6 @@
<include>io.netty:netty-buffer:jar</include>
<include>io.netty:netty-common:jar</include>

<!--Add avro and zstd-jni jar -->
<include>com.github.luben:zstd-jni:jar</include>
<include>com.github.luben:zstd-jni:jar</include>

</includes>
<outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
<outputDirectory>/lib</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@
@Slf4j
@DisabledOnContainer(
value = {},
type = {EngineType.SEATUNNEL, EngineType.SPARK},
disabledReason = "")
type = {EngineType.SPARK},
disabledReason = "Currently SPARK do not support cdc")
@DisabledOnOs(OS.WINDOWS)
public class IcebergSinkCDCIT extends TestSuiteBase implements TestResource {

Expand Down Expand Up @@ -116,12 +116,15 @@ private String zstdUrl() {
protected final ContainerExtendedFactory extendedFactory =
container -> {
container.execInContainer("sh", "-c", "mkdir -p " + CATALOG_DIR);
container.execInContainer("sh", "-c", "chown -R flink " + CATALOG_DIR);
container.execInContainer(
"sh",
"-c",
"mkdir -p /tmp/seatunnel/lib && cd /tmp/seatunnel/lib && wget "
+ zstdUrl());
container.execInContainer("sh", "-c", "chmod -R 777 " + CATALOG_DIR);
Container.ExecResult extraCommandsZSTD =
container.execInContainer(
"sh",
"-c",
"mkdir -p /tmp/seatunnel/plugins/Iceberg/lib && cd /tmp/seatunnel/plugins/Iceberg/lib && wget "
+ zstdUrl());
Assertions.assertEquals(
0, extraCommandsZSTD.getExitCode(), extraCommandsZSTD.getStderr());
Container.ExecResult extraCommands =
container.execInContainer(
"sh",
Expand Down Expand Up @@ -198,7 +201,6 @@ public void testMysqlCdcCheckDataE2e(TestContainer container)
}
return null;
});

insertAndCheckData(container);
upsertAndCheckData(container);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;

Expand Down Expand Up @@ -55,8 +55,8 @@

@Slf4j
@DisabledOnContainer(
value = {},
type = {EngineType.SEATUNNEL, EngineType.SPARK},
value = {TestContainerId.SPARK_2_4},
type = {},
disabledReason = "")
@DisabledOnOs(OS.WINDOWS)
public class IcebergSinkIT extends TestSuiteBase {
Expand All @@ -73,14 +73,12 @@ private String zstdUrl() {
protected final ContainerExtendedFactory extendedFactory =
container -> {
container.execInContainer("sh", "-c", "mkdir -p " + CATALOG_DIR);
container.execInContainer("sh", "-c", "chown 777 " + CATALOG_DIR);
Container.ExecResult extraCommands =
container.execInContainer(
"sh",
"-c",
"mkdir -p /tmp/seatunnel/lib && cd /tmp/seatunnel/lib && wget "
+ zstdUrl());
Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr());
container.execInContainer("sh", "-c", "chmod -R 777 " + CATALOG_DIR);
container.execInContainer(
"sh",
"-c",
"mkdir -p /tmp/seatunnel/plugins/Iceberg/lib && cd /tmp/seatunnel/plugins/Iceberg/lib && wget "
+ zstdUrl());
};

private final String NAMESPACE_TAR = NAMESPACE + ".tar.gz";
Expand Down
12 changes: 0 additions & 12 deletions seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop3.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>

</dependencies>

<build>
Expand Down

0 comments on commit 24b2f9c

Please sign in to comment.