Skip to content

Commit

Permalink
Supports iceberg sink apache#6198
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jan 21, 2024
1 parent 8b53bea commit 112ddea
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 9 deletions.
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@
- [Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
- [Transform-V2] Add catalog support for SQL Transform plugin (#4819)
- [Connector-V2] [Assert] Support check the precision and scale of Decimal type (#6110)
- [Connector-V2] [Iceberg] Support iceberg sink #6198

### Zeta(ST-Engine)

Expand Down
47 changes: 47 additions & 0 deletions seatunnel-connectors-v2/connector-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<parquet-avro.version>1.13.1</parquet-avro.version>
<avro.version>1.11.3</avro.version>
<hive.version>2.3.9</hive.version>
<connector.name>connector-iceberg</connector.name>
</properties>

<dependencies>
Expand All @@ -54,6 +55,7 @@
<artifactId>iceberg-common</artifactId>
<version>${iceberg.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
Expand Down Expand Up @@ -200,4 +202,49 @@

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<relocations>
<relocation>
<pattern>org.apache.avro</pattern>
<!--suppress UnresolvedMavenProperty, this property is added by submodule-->
<shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.avro</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.orc</pattern>
<shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.orc</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.parquet</pattern>
<!--suppress UnresolvedMavenProperty -->
<shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.parquet</shadedPattern>
</relocation>
<relocation>
<pattern>shaded.parquet</pattern>
<!--suppress UnresolvedMavenProperty -->
<shadedPattern>${seatunnel.shade.package}.${connector.name}.shaded.parquet</shadedPattern>
</relocation>
<relocation>
<pattern>com.github.luben</pattern>
<!--suppress UnresolvedMavenProperty -->
<shadedPattern>${seatunnel.shade.package}.${connector.name}.com.github.luben</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ protected List<Object> convertListValue(
SeaTunnelDataType fromType,
Types.ListType type,
SchemaChangeWrapper wrapper) {
;
Preconditions.checkArgument(value.getClass().isArray());
Object[] list = (Object[]) value;
return Arrays.stream(list)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;

Expand Down Expand Up @@ -70,8 +71,8 @@

@Slf4j
@DisabledOnContainer(
value = {},
type = {EngineType.SEATUNNEL, EngineType.SPARK},
value = {TestContainerId.SPARK_2_4},
type = {EngineType.SEATUNNEL},
disabledReason = "")
@DisabledOnOs(OS.WINDOWS)
public class IcebergSinkCDCIT extends TestSuiteBase implements TestResource {
Expand Down Expand Up @@ -198,13 +199,11 @@ public void testMysqlCdcCheckDataE2e(TestContainer container)
}
return null;
});

insertAndCheckData(container);
upsertAndCheckData(container);
}

private void upsertAndCheckData(TestContainer container)
throws InterruptedException, IOException {
private void upsertAndCheckData(TestContainer container) throws InterruptedException, IOException {
upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE);
// Waiting 30s for source capture data
sleep(30000);
Expand Down Expand Up @@ -276,6 +275,7 @@ private List<Record> loadIcebergTable() {
return results;
}


private void clearTable(String database, String tableName) {
executeSql("truncate table " + database + "." + tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;

Expand Down Expand Up @@ -55,8 +56,8 @@

@Slf4j
@DisabledOnContainer(
value = {},
type = {EngineType.SEATUNNEL, EngineType.SPARK},
value = {TestContainerId.SPARK_2_4},
type = {EngineType.SEATUNNEL},
disabledReason = "")
@DisabledOnOs(OS.WINDOWS)
public class IcebergSinkIT extends TestSuiteBase {
Expand All @@ -73,7 +74,7 @@ private String zstdUrl() {
protected final ContainerExtendedFactory extendedFactory =
container -> {
container.execInContainer("sh", "-c", "mkdir -p " + CATALOG_DIR);
container.execInContainer("sh", "-c", "chown 777 " + CATALOG_DIR);
container.execInContainer("sh", "-c", "chown -R flink " + CATALOG_DIR);
Container.ExecResult extraCommands =
container.execInContainer(
"sh",
Expand Down

0 comments on commit 112ddea

Please sign in to comment.