From 34a827ed2f39e36e742f8a3c4201bb4e36b69c0c Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Sat, 22 Apr 2023 22:32:39 +0800 Subject: [PATCH] Add OSS/S3 to cluster-mode type #4621 --- .../imap-storage-file/pom.xml | 28 ++++++ .../storage/file/IMapFileStorageFactory.java | 18 +++- .../file/config/AbstractConfiguration.java | 66 ++++++++++++++ .../file/config/FileConfiguration.java | 46 ++++++++++ .../file/config/HdfsConfiguration.java | 91 +++++++++++++++++++ .../storage/file/config/OssConfiguration.java | 48 ++++++++++ .../storage/file/config/S3Configuration.java | 60 ++++++++++++ 7 files changed, 353 insertions(+), 4 deletions(-) create mode 100644 seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java create mode 100644 seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/FileConfiguration.java create mode 100644 seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java create mode 100644 seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java create mode 100644 seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml index c4fd403ae11b..4a073aac53c2 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml @@ -30,12 +30,19 @@ imap-storage-file SeaTunnel : Engine : Storage : IMap Storage Plugins : File + + 3.0.0 + 3.1.4 + 1.11.271 + + org.apache.seatunnel serializer-protobuf ${project.version} + org.apache.seatunnel seatunnel-hadoop3-3.1.4-uber @@ -62,6 +69,27 @@ org.awaitility awaitility + + + org.apache.hadoop + hadoop-aliyun + ${hadoop-aliyun.version} + provided + + + + org.apache.hadoop + hadoop-aws + ${hadoop-aws.version} + provided + + + com.amazonaws + aws-java-sdk-bundle + ${aws.java.sdk.version} + provided + + diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java index 7309770d0521..77f238db9468 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java @@ -23,8 +23,8 @@ import org.apache.seatunnel.engine.imap.storage.api.IMapStorage; import org.apache.seatunnel.engine.imap.storage.api.IMapStorageFactory; import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; - -import org.apache.hadoop.conf.Configuration; +import org.apache.seatunnel.engine.imap.storage.file.config.AbstractConfiguration; +import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import com.google.auto.service.AutoService; @@ -34,6 +34,9 @@ @AutoService(IMapStorageFactory.class) public class IMapFileStorageFactory implements IMapStorageFactory { + + private static final String STORAGE_TYPE_KEY = "storage.type"; + @Override public String factoryIdentifier() { return "hdfs"; @@ -41,9 +44,16 @@ public String factoryIdentifier() { @Override public IMapStorage create(Map initMap) throws IMapStorageException { + + String storageType = + String.valueOf( + initMap.getOrDefault(STORAGE_TYPE_KEY, FileConfiguration.HDFS.toString())); + // build configuration + AbstractConfiguration configuration = + FileConfiguration.valueOf(storageType.toUpperCase()).getConfiguration(storageType); + configuration.buildConfiguration(initMap); + IMapFileStorage iMapFileStorage = new IMapFileStorage(); - Configuration configuration = new Configuration(); - configuration.set("fs.defaultFS", (String) initMap.get("fs.defaultFS")); initMap.put(HDFS_CONFIG_KEY, configuration); iMapFileStorage.initialize(initMap); return iMapFileStorage; diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java new file mode 100644 index 000000000000..a98e1be61367 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.config; + +import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Map; + +public abstract class AbstractConfiguration { + + protected static final String HDFS_IMPL_KEY = "impl"; + + /** + * check the configuration keys + * + * @param config configuration + * @param keys keys + */ + void checkConfiguration(Map config, String... keys) { + for (String key : keys) { + if (!config.containsKey(key) || null == config.get(key)) { + throw new IllegalArgumentException(key + " is required"); + } + } + } + + public abstract Configuration buildConfiguration(Map config) + throws IMapStorageException; + + /** + * set extra options for configuration + * + * @param hadoopConf + * @param config + * @param prefix + */ + void setExtraConfiguration( + Configuration hadoopConf, Map config, String prefix) { + config.forEach( + (k, v) -> { + if (k.startsWith(prefix)) { + hadoopConf.set(k, String.valueOf(v)); + } + }); + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/FileConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/FileConfiguration.java new file mode 100644 index 000000000000..22a0637ea88d --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/FileConfiguration.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.config; + +public enum FileConfiguration { + HDFS("hdfs", new HdfsConfiguration()), + S3("s3", new S3Configuration()), + OSS("oss", new OssConfiguration()); + + /** file system type */ + private final String name; + + /** file system configuration */ + private final AbstractConfiguration configuration; + + FileConfiguration(String name, AbstractConfiguration configuration) { + this.name = name; + this.configuration = configuration; + } + + public AbstractConfiguration getConfiguration(String name) { + return configuration; + } + + public String getName() { + return name; + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java new file mode 100644 index 000000000000..312d8b069e2a --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.config; + +import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.util.Map; + +import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY; + +public class HdfsConfiguration extends AbstractConfiguration { + private static final String HDFS_DEF_FS_NAME = "fs.defaultFS"; + private static final String KERBEROS_PRINCIPAL = "kerberosPrincipal"; + private static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath"; + private static final String HADOOP_SECURITY_AUTHENTICATION_KEY = + "hadoop.security.authentication"; + private static final String KERBEROS_KEY = "kerberos"; + private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem"; + private static final String HDFS_IMPL_KEY = "fs.hdfs.impl"; + + @Override + public Configuration buildConfiguration(Map config) + throws IMapStorageException { + checkConfiguration(config, HDFS_DEF_FS_NAME); + Configuration hadoopConf = new Configuration(); + + hadoopConf.set(HDFS_DEF_FS_NAME, String.valueOf(config.get(HDFS_DEF_FS_NAME))); + + hadoopConf.set(HDFS_IMPL_KEY, HDFS_IMPL); + hadoopConf.set(FS_DEFAULT_NAME_KEY, String.valueOf(config.get(FS_DEFAULT_NAME_KEY))); + if (config.containsKey(KERBEROS_PRINCIPAL) + && config.containsKey(KERBEROS_KEYTAB_FILE_PATH)) { + String kerberosPrincipal = String.valueOf(config.get(KERBEROS_PRINCIPAL)); + String kerberosKeytabFilePath = String.valueOf(config.get(KERBEROS_KEYTAB_FILE_PATH)); + if (StringUtils.isNotBlank(kerberosPrincipal) + && StringUtils.isNotBlank(kerberosKeytabFilePath)) { + hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, KERBEROS_KEY); + authenticateKerberos(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf); + } + } + // todo support other hdfs optional config keys + return hadoopConf; + } + + /** + * Authenticate kerberos + * + * @param kerberosPrincipal + * @param kerberosKeytabFilePath + * @param hdfsConf + * @throws IMapStorageException + */ + private void authenticateKerberos( + String kerberosPrincipal, String kerberosKeytabFilePath, Configuration hdfsConf) + throws IMapStorageException { + UserGroupInformation.setConfiguration(hdfsConf); + try { + UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath); + } catch (IOException e) { + throw new IMapStorageException( + "Failed to login user from keytab : " + + kerberosKeytabFilePath + + " and kerberos principal : " + + kerberosPrincipal, + e); + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java new file mode 100644 index 000000000000..d36aa3414579 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.config; + +import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Map; + +import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY; + +public class OssConfiguration extends AbstractConfiguration { + public static final String OSS_BUCKET_KEY = "oss.bucket"; + private static final String OSS_IMPL_KEY = "fs.oss.impl"; + private static final String HDFS_OSS_IMPL = + "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"; + private static final String OSS_KEY = "fs.oss."; + + @Override + public Configuration buildConfiguration(Map config) + throws IMapStorageException { + checkConfiguration(config, OSS_BUCKET_KEY); + Configuration hadoopConf = new Configuration(); + hadoopConf.set(FS_DEFAULT_NAME_KEY, String.valueOf(config.get(OSS_BUCKET_KEY))); + hadoopConf.set(OSS_IMPL_KEY, HDFS_OSS_IMPL); + setExtraConfiguration(hadoopConf, config, OSS_KEY); + return hadoopConf; + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java new file mode 100644 index 000000000000..5d34f7814bbc --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.config; + +import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Map; + +import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY; + +public class S3Configuration extends AbstractConfiguration { + public static final String S3_BUCKET_KEY = "s3.bucket"; + private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem"; + private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem"; + private static final String S3A_PROTOCOL = "s3a"; + private static final String DEFAULT_PROTOCOL = "s3n"; + private static final String S3_FORMAT_KEY = "fs.%s.%s"; + private static final String SPLIT_CHAR = "."; + private static final String FS_KEY = "fs."; + + @Override + public Configuration buildConfiguration(Map config) + throws IMapStorageException { + checkConfiguration(config, S3_BUCKET_KEY); + String protocol = DEFAULT_PROTOCOL; + if (config.get(S3_BUCKET_KEY).toString().startsWith(S3A_PROTOCOL)) { + protocol = S3A_PROTOCOL; + } + String fsImpl = protocol.equals(S3A_PROTOCOL) ? HDFS_S3A_IMPL : HDFS_S3N_IMPL; + Configuration hadoopConf = new Configuration(); + hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY).toString()); + hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl); + setExtraConfiguration(hadoopConf, config, FS_KEY + protocol + SPLIT_CHAR); + return hadoopConf; + } + + private String formatKey(String protocol, String key) { + return String.format(S3_FORMAT_KEY, protocol, key); + } +}