From 340b0a6d34a197e82a4c8bd7570a6cef3978e2e5 Mon Sep 17 00:00:00 2001 From: rtotaro Date: Thu, 22 Apr 2021 06:51:11 +0200 Subject: [PATCH] #197 - In Progress: hive 3 migration --- waggle-dance-core/pom.xml | 5 ++ .../client/ThriftMetastoreClientManager.java | 19 +++--- .../mapping/model/DatabaseMapping.java | 8 +-- .../mapping/model/DatabaseMappingImpl.java | 19 ------ .../server/FederatedHMSHandler.java | 67 +------------------ .../server/MetaStoreProxyServer.java | 7 +- 6 files changed, 19 insertions(+), 106 deletions(-) diff --git a/waggle-dance-core/pom.xml b/waggle-dance-core/pom.xml index ded8fbfcc..17a8a0f86 100644 --- a/waggle-dance-core/pom.xml +++ b/waggle-dance-core/pom.xml @@ -15,6 +15,11 @@ + + + + + com.hotels hcommon-ssh diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java index ff23c4739..0aa0ee594 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2019 Expedia, Inc. + * Copyright (C) 2016-2021 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,12 +25,11 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.conf.HiveConfUtil; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; +import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.util.StringUtils; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; @@ -122,8 +121,7 @@ void open() { if (useSasl) { // Wrap thrift connection with SASL for secure connection. try { - HadoopThriftAuthBridge.Client authBridge = ShimLoader.getHadoopThriftAuthBridge().createClient(); - + HadoopThriftAuthBridge.Client authBridge = HadoopThriftAuthBridge.getBridge().createClient(); // check if we should use delegation tokens to authenticate // the call below gets hold of the tokens if they are set up by hadoop // this should happen on the map/reduce tasks if the client added the @@ -131,17 +129,18 @@ void open() { // submission. String tokenSig = conf.getVar(ConfVars.METASTORE_TOKEN_SIGNATURE); // tokenSig could be null - String tokenStrForm = Utils.getTokenStrForm(tokenSig); + String tokenStrForm = SecurityUtils.getTokenStrForm(tokenSig); if (tokenStrForm != null) { // authenticate using delegation tokens via the "DIGEST" mechanism transport = authBridge .createClientTransport(null, store.getHost(), "DIGEST", tokenStrForm, transport, - MetaStoreUtils.getMetaStoreSaslProperties(conf)); + //TODO: check useSSL + MetaStoreUtils.getMetaStoreSaslProperties(conf,false)); } else { String principalConfig = conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL); transport = authBridge .createClientTransport(principalConfig, store.getHost(), "KERBEROS", null, transport, - MetaStoreUtils.getMetaStoreSaslProperties(conf)); + MetaStoreUtils.getMetaStoreSaslProperties(conf,false)); } } catch (IOException ioe) { LOG.error("Couldn't create client transport", ioe); diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/model/DatabaseMapping.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/model/DatabaseMapping.java index a26699f1a..283f84ecc 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/model/DatabaseMapping.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/model/DatabaseMapping.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2020 Expedia, Inc. + * Copyright (C) 2016-2021 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,7 +38,6 @@ import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeRequest; import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; import org.apache.hadoop.hive.metastore.api.HiveObjectRef; -import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.LockRequest; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionSpec; @@ -56,7 +55,6 @@ public interface DatabaseMapping extends MetaStoreMapping { - Index transformInboundIndex(Index index); Partition transformInboundPartition(Partition partition); @@ -64,8 +62,6 @@ public interface DatabaseMapping extends MetaStoreMapping { HiveObjectRef transformInboundHiveObjectRef(HiveObjectRef function); - Index transformOutboundIndex(Index index); - Partition transformOutboundPartition(Partition partition); Table transformOutboundTable(Table table); @@ -116,8 +112,6 @@ public interface DatabaseMapping extends MetaStoreMapping { List transformInboundPartitions(List partitions); - List transformOutboundIndexes(List indexes); - ColumnStatistics transformInboundColumnStatistics(ColumnStatistics columnStatistics); ColumnStatistics transformOutboundColumnStatistics(ColumnStatistics columnStatistics); diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/model/DatabaseMappingImpl.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/model/DatabaseMappingImpl.java index eb6df5c3a..bafdf6777 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/model/DatabaseMappingImpl.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/model/DatabaseMappingImpl.java @@ -42,7 +42,6 @@ import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; import org.apache.hadoop.hive.metastore.api.HiveObjectRef; import org.apache.hadoop.hive.metastore.api.HiveObjectType; -import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockRequest; @@ -133,12 +132,6 @@ public Partition transformOutboundPartition(Partition partition) { return partition; } - @Override - public Index transformOutboundIndex(Index index) { - index.setDbName(metaStoreMapping.transformOutboundDatabaseName(index.getDbName())); - return index; - } - @Override public Table transformInboundTable(Table table) { table.setDbName(metaStoreMapping.transformInboundDatabaseName(table.getDbName())); @@ -151,11 +144,6 @@ public Partition transformInboundPartition(Partition partition) { return partition; } - @Override - public Index transformInboundIndex(Index index) { - index.setDbName(metaStoreMapping.transformInboundDatabaseName(index.getDbName())); - return index; - } @Override public Function transformOutboundFunction(Function function) { @@ -375,13 +363,6 @@ public List transformInboundPartitions(List partitions) { return partitions; } - @Override - public List transformOutboundIndexes(List indexes) { - for (Index index : indexes) { - transformOutboundIndex(index); - } - return indexes; - } @Override public ColumnStatistics transformInboundColumnStatistics(ColumnStatistics columnStatistics) { diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java index a4bf274f8..0cd61e3ef 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java @@ -77,7 +77,6 @@ import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; import org.apache.hadoop.hive.metastore.api.HiveObjectRef; import org.apache.hadoop.hive.metastore.api.HiveObjectType; -import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.InvalidInputException; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; @@ -361,6 +360,8 @@ public List get_tables(String db_name, String pattern) throws MetaExcept return mapping.getMetastoreFilter().filterTableNames(db_name, resultTables); } + + @Override @Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME) public List get_all_tables(String db_name) throws MetaException, TException { @@ -915,70 +916,6 @@ public boolean isPartitionMarkedForEvent( .isPartitionMarkedForEvent(mapping.transformInboundDatabaseName(db_name), tbl_name, part_vals, eventType); } - @Override - @Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME) - public Index add_index(Index new_index, Table index_table) - throws InvalidObjectException, AlreadyExistsException, MetaException, TException { - DatabaseMapping mapping = checkWritePermissionsAndCheckTableAllowed(new_index.getDbName(), new_index.getOrigTableName()); - checkWritePermissionsAndCheckTableAllowed(index_table.getDbName(), index_table.getTableName(), mapping); - Index result = mapping - .getClient() - .add_index(mapping.transformInboundIndex(new_index), mapping.transformInboundTable(index_table)); - return mapping.transformOutboundIndex(result); - } - - @Override - @Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME) - public void alter_index(String dbname, String base_tbl_name, String idx_name, Index new_idx) - throws InvalidOperationException, MetaException, TException { - DatabaseMapping mapping = checkWritePermissionsAndCheckTableAllowed(dbname, base_tbl_name); - checkWritePermissionsAndCheckTableAllowed(new_idx.getDbName(), new_idx.getOrigTableName(), mapping); - mapping - .getClient() - .alter_index(mapping.transformInboundDatabaseName(dbname), base_tbl_name, idx_name, - mapping.transformInboundIndex(new_idx)); - } - - @Override - @Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME) - public boolean drop_index_by_name(String db_name, String tbl_name, String index_name, boolean deleteData) - throws NoSuchObjectException, MetaException, TException { - DatabaseMapping mapping = checkWritePermissionsAndCheckTableAllowed(db_name, tbl_name); - return mapping - .getClient() - .drop_index_by_name(mapping.transformInboundDatabaseName(db_name), tbl_name, index_name, deleteData); - } - - @Override - @Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME) - public Index get_index_by_name(String db_name, String tbl_name, String index_name) - throws MetaException, NoSuchObjectException, TException { - DatabaseMapping mapping = getDbMappingAndCheckTableAllowed(db_name, tbl_name); - Index result = mapping.getClient().get_index_by_name(mapping.transformInboundDatabaseName(db_name), tbl_name, index_name); - return mapping.transformOutboundIndex(mapping.getMetastoreFilter().filterIndex(result)); - } - - @Override - @Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME) - public List get_indexes(String db_name, String tbl_name, short max_indexes) - throws NoSuchObjectException, MetaException, TException { - DatabaseMapping mapping = getDbMappingAndCheckTableAllowed(db_name, tbl_name); - List indexes = mapping - .getClient() - .get_indexes(mapping.transformInboundDatabaseName(db_name), tbl_name, max_indexes); - return mapping.transformOutboundIndexes(mapping.getMetastoreFilter().filterIndexes(indexes)); - } - - @Override - @Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME) - public List get_index_names(String db_name, String tbl_name, short max_indexes) - throws MetaException, TException { - DatabaseMapping mapping = getDbMappingAndCheckTableAllowed(db_name, tbl_name); - List result = mapping.getClient() - .get_index_names(mapping.transformInboundDatabaseName(db_name), tbl_name, max_indexes); - return mapping.getMetastoreFilter().filterIndexNames(db_name, tbl_name, result); - } - @Override @Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME) public boolean update_table_column_statistics(ColumnStatistics stats_obj) diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java index 705b4fe6c..e3661b45e 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; import org.apache.hadoop.util.StringUtils; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; @@ -125,7 +124,7 @@ public void run(ApplicationArguments args) throws Exception { })); AtomicBoolean startedServing = new AtomicBoolean(); - startWaggleDance(ShimLoader.getHadoopThriftAuthBridge(), startLock, startCondition, startedServing); + startWaggleDance(startLock, startCondition, startedServing); } catch (Throwable t) { // Catch the exception, log it and rethrow it. LOG.error("WaggleDance Thrift Server threw an exception...", t); @@ -134,16 +133,14 @@ public void run(ApplicationArguments args) throws Exception { } /** - * Start Metastore based on a passed {@link HadoopThriftAuthBridge} + * Start Metastore based on a passed * - * @param bridge * @param startLock * @param startCondition * @param startedServing * @throws Throwable */ private void startWaggleDance( - HadoopThriftAuthBridge bridge, Lock startLock, Condition startCondition, AtomicBoolean startedServing)