Skip to content

Commit

Permalink
[INLONG-4431][Sort] Sort lightweight support load data to DLC (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
thesumery authored and vernedeng committed Jul 4, 2022
1 parent a68fa69 commit a8df31d
Show file tree
Hide file tree
Showing 17 changed files with 2,125 additions and 2 deletions.
1 change: 1 addition & 0 deletions inlong-sort/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
<kafka.clients.version>2.7.0</kafka.clients.version>
<rat.basedir>${basedir}</rat.basedir>
<hbase.version>2.2.3</hbase.version>
<dlc.hive.version>2.3.7</dlc.hive.version>
</properties>
<dependencyManagement>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.protocol.constant;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class DLCConstant {
/**
* DLC internet access domain name.
*/
public static final String DLC_ENDPOINT = "dlc.tencentcloudapi.com";

/**
* dlc account region
*/
public static final String DLC_REGION = "qcloud.dlc.region";
/**
* dlc account secret id
*/
public static final String DLC_SECRET_ID = "qcloud.dlc.secret-id";
/**
* dlc account secret key
*/
public static final String DLC_SECRET_KEY = "qcloud.dlc.secret-key";

/**
* dlc cos region
*/
public static final String FS_COS_REGION = "fs.cosn.userinfo.region";
/**
* dlc main account cos secret id
*/
public static final String FS_COS_SECRET_ID = "fs.cosn.userinfo.secretId";
/**
* dlc main account cos secret key
*/
public static final String FS_COS_SECRET_KEY = "fs.cosn.userinfo.secretKey";

public static final String FS_LAKEFS_IMPL = "fs.lakefs.impl";
public static final String FS_COS_IMPL = "fs.cosn.impl";
public static final String FS_COS_AUTH_PROVIDER = "fs.cosn.credentials.provider";

public static final String DLC_CATALOG_IMPL_CLASS =
"org.apache.inlong.sort.iceberg.catalog.hybris.DlcWrappedHybrisCatalog";
public static final Map<String, String> DLC_DEFAULT_IMPL =
Collections.unmodifiableMap(new HashMap<String, String>() {
{
put(FS_LAKEFS_IMPL, "org.apache.hadoop.fs.CosFileSystem");
put(FS_COS_IMPL, "org.apache.hadoop.fs.CosFileSystem");
put(FS_COS_AUTH_PROVIDER, "org.apache.hadoop.fs.auth.SimpleCredentialProvider");
}
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
import org.apache.inlong.sort.protocol.node.load.DLCIcebergLoadNode;
import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
Expand Down Expand Up @@ -68,7 +69,8 @@
@JsonSubTypes.Type(value = IcebergLoadNode.class, name = "icebergLoad"),
@JsonSubTypes.Type(value = ElasticsearchLoadNode.class, name = "elasticsearchLoad"),
@JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad"),
@JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad")
@JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad"),
@JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name = "dlcIcebergLoad")
})
@NoArgsConstructor
@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
import org.apache.inlong.sort.protocol.node.load.DLCIcebergLoadNode;
import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
Expand Down Expand Up @@ -81,7 +82,8 @@
@JsonSubTypes.Type(value = IcebergLoadNode.class, name = "icebergLoad"),
@JsonSubTypes.Type(value = ElasticsearchLoadNode.class, name = "elasticsearchLoad"),
@JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad"),
@JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad")
@JsonSubTypes.Type(value = GreenplumLoadNode.class, name = "greenplumLoad"),
@JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name = "dlcIcebergLoad")
})
public interface Node {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.protocol.node.load;

import com.google.common.base.Preconditions;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.constant.DLCConstant;
import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.List;
import java.util.Map;

@JsonTypeName("dlcIcebergLoad")
@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class DLCIcebergLoadNode extends LoadNode implements Serializable {

private static final long serialVersionUID = -1L;

@JsonProperty("tableName")
@Nonnull
private String tableName;

@JsonProperty("dbName")
@Nonnull
private String dbName;

@JsonProperty("primaryKey")
private String primaryKey;

@JsonProperty("uri")
private String uri;

@JsonProperty("warehouse")
private String warehouse;

@JsonCreator
public DLCIcebergLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
@JsonProperty("fieldRelations") List<FieldRelation> fieldRelationShips,
@JsonProperty("filters") List<FilterFunction> filters,
@JsonProperty("filterStrategy") FilterStrategy filterStrategy,
@Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
@JsonProperty("properties") Map<String, String> properties,
@Nonnull @JsonProperty("dbName") String dbName,
@Nonnull @JsonProperty("tableName") String tableName,
@JsonProperty("primaryKey") String primaryKey,
@JsonProperty("uri") String uri,
@JsonProperty("warehouse") String warehouse) {
super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
this.primaryKey = primaryKey;
this.uri = uri == null ? DLCConstant.DLC_ENDPOINT : uri;
this.warehouse = warehouse;
validateAuth(properties);
}

@Override
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
options.put("connector", "iceberg");
options.put("catalog-database", dbName);
options.put("catalog-table", tableName);
options.put("default-database", dbName);
options.put("catalog-name", CatalogType.HYBRIS.name());
options.put("catalog-impl", DLCConstant.DLC_CATALOG_IMPL_CLASS);
if (null != uri) {
options.put("uri", uri);
}
if (null != warehouse) {
options.put("warehouse", warehouse);
}
options.putAll(DLCConstant.DLC_DEFAULT_IMPL);
return options;
}

@Override
public String genTableName() {
return tableName;
}

@Override
public String getPrimaryKey() {
return primaryKey;
}

@Override
public List<FieldInfo> getPartitionFields() {
return super.getPartitionFields();
}

private void validateAuth(Map<String, String> properties) {
Preconditions.checkNotNull(properties);
Preconditions.checkNotNull(properties.get(DLCConstant.DLC_SECRET_ID), "dlc secret-id is null");
Preconditions.checkNotNull(properties.get(DLCConstant.DLC_SECRET_KEY), "dlc secret-key is null");
Preconditions.checkNotNull(properties.get(DLCConstant.DLC_REGION), "dlc region is null");

Preconditions.checkNotNull(properties.get(DLCConstant.FS_COS_REGION), "cos region is null");
Preconditions.checkNotNull(properties.get(DLCConstant.FS_COS_SECRET_ID), "cos secret-id is null");
Preconditions.checkNotNull(properties.get(DLCConstant.FS_COS_SECRET_KEY), "cos secret-key is null");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.protocol.node.load;

import org.apache.inlong.sort.SerializeBaseTest;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.constant.DLCConstant;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

/**
* test for dlc load node
*/
public class DLCIcebergLoadNodeTest extends SerializeBaseTest<DLCIcebergLoadNode> {
@Override
public DLCIcebergLoadNode getTestObject() {
Map<String, String> properties = new HashMap<>();
properties.put(DLCConstant.DLC_REGION, "ap-beijing");
properties.put(DLCConstant.DLC_SECRET_ID, "XXXXXXXXXXX");
properties.put(DLCConstant.DLC_SECRET_KEY, "XXXXXXXXXXX");

properties.put(DLCConstant.FS_COS_REGION, "ap-beijing");
properties.put(DLCConstant.FS_COS_SECRET_ID, "XXXXXXXXXXX");
properties.put(DLCConstant.FS_COS_SECRET_KEY, "XXXXXXXXXXX");
return new DLCIcebergLoadNode(
"iceberg_dlc",
"iceberg_dlc_output",
Arrays.asList(new FieldInfo("field", new StringFormatInfo())),
Arrays.asList(new FieldRelation(new FieldInfo("field", new StringFormatInfo()),
new FieldInfo("field", new StringFormatInfo()))),
null,
null,
null,
properties,
"inlong",
"inlong_iceberg_dlc",
null,
null,
"/iceberg_dlc/warehouse");
}
}
Loading

0 comments on commit a8df31d

Please sign in to comment.