From 2de0e1921be82c5e6e03392eac52eef3f4895679 Mon Sep 17 00:00:00 2001 From: dtb <2011xuesong@gmail.com> Date: Tue, 19 Mar 2024 17:29:46 +0800 Subject: [PATCH] first step one --- .../client/DefaultMetaStoreClientFactory.java | 34 ++++++------ .../client/ThriftMetastoreClientManager.java | 3 +- .../server/FederatedHMSHandler.java | 49 +++++++++++++++-- .../server/FederatedHMSHandlerFactory.java | 6 +-- .../server/MetaStoreProxyServer.java | 41 ++++++++++---- .../server/TSetIpAddressProcessorFactory.java | 10 +++- .../server/TokenWrappingHMSHandler.java | 53 +++++++++---------- .../FederatedHMSHandlerFactoryTest.java | 2 +- .../server/FederatedHMSHandlerTest.java | 2 +- 9 files changed, 135 insertions(+), 65 deletions(-) diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java index 4b6986048..d8f4d5770 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2023 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -167,23 +167,23 @@ private SaslMetastoreClientHander( @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { try { - switch (method.getName()) { - case "get_delegation_token": - try { - clientManager.open(); - Object token = method.invoke(baseHandler, args); - this.delegationToken = (String) token; - clientManager.close(); - setTokenStr2Ugi(UserGroupInformation.getCurrentUser(), (String) token); - clientManager.open(); - return token; - } catch (IOException e) { - throw new MetastoreUnavailableException("Couldn't setup delegation token in the ugi: ", e); - } - default: - genToken(); +// switch (method.getName()) { +// case "get_delegation_token": +// try { +// clientManager.open(); +// Object token = method.invoke(baseHandler, args); +// this.delegationToken = (String) token; +// clientManager.close(); +// setTokenStr2Ugi(UserGroupInformation.getCurrentUser(), (String) token); +// clientManager.open(); +// return token; +// } catch (IOException e) { +// throw new MetastoreUnavailableException("Couldn't setup delegation token in the ugi: ", e); +// } +// default: +// genToken(); return method.invoke(baseHandler, args); - } +// } } catch (InvocationTargetException e) { throw e.getTargetException(); } catch (UndeclaredThrowableException e) { diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java index 6ff94e8d0..311708e0f 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2023 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -138,6 +138,7 @@ void open(HiveUgiArgs ugiArgs) { String tokenSig = conf.getVar(ConfVars.METASTORE_TOKEN_SIGNATURE); // tokenSig could be null String tokenStrForm = SecurityUtils.getTokenStrForm(tokenSig); + tokenStrForm = null; if (tokenStrForm != null) { // authenticate using delegation tokens via the "DIGEST" mechanism transport = KerberosSaslHelper diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java index 842bbd9fb..81dadf20e 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2023 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -245,6 +245,7 @@ class FederatedHMSHandler extends FacebookBase implements CloseableIHMSHandler { private final NotifyingFederationService notifyingFederationService; private final WaggleDanceConfiguration waggleDanceConfiguration; private Configuration conf; + private MetaStoreProxyServer metaStoreProxyServer; FederatedHMSHandler( MappingEventListener databaseMappingService, @@ -1369,19 +1370,54 @@ public List set_ugi(String user_name, List group_names) throws M @Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME) public String get_delegation_token(String token_owner, String renewer_kerberos_principal_name) throws MetaException, TException { - return getPrimaryClient().get_delegation_token(token_owner, renewer_kerberos_principal_name); + String ret = null; + try { + ret = + HiveMetaStore.getDelegationToken(token_owner, + renewer_kerberos_principal_name, metaStoreProxyServer.getIPAddress()); + } catch (IOException | InterruptedException e) { + throw new MetaException(e.getMessage()); + } + return ret; } @Override @Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME) public long renew_delegation_token(String token_str_form) throws MetaException, TException { - return getPrimaryClient().renew_delegation_token(token_str_form); + Long ret = null; + Exception ex = null; + try { + ret = HiveMetaStore.renewDelegationToken(token_str_form); + } catch (IOException e) { + ex = e; + throw new MetaException(e.getMessage()); + } catch (Exception e) { + ex = e; + throw newMetaException(e); + } + return ret; } @Override @Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME) public void cancel_delegation_token(String token_str_form) throws MetaException, TException { - getPrimaryClient().cancel_delegation_token(token_str_form); + boolean success = false; + try { + HiveMetaStore.cancelDelegationToken(token_str_form); + } catch (IOException e) { + throw new MetaException(e.getMessage()); + } catch (Exception e) { + throw newMetaException(e); + } + } + + private static MetaException newMetaException(Exception e) { + if (e instanceof MetaException) { + return (MetaException)e; + } + MetaException me = new MetaException(e.toString()); + me.initCause(e); + return me; } @Override @@ -1527,6 +1563,11 @@ public void setConf(Configuration conf) { this.conf = conf; } + public void setMetaStoreProxyServer( + MetaStoreProxyServer metaStoreProxyServer) { + this.metaStoreProxyServer = metaStoreProxyServer; + } + @Override public void init() throws MetaException {} diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactory.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactory.java index d1d501ace..b6326c03c 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactory.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactory.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2023 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,11 +53,11 @@ public FederatedHMSHandlerFactory( this.queryMapping = queryMapping; } - public CloseableIHMSHandler create() { + public FederatedHMSHandler create() { MappingEventListener service = createDatabaseMappingService(); MonitoredDatabaseMappingService monitoredService = new MonitoredDatabaseMappingService(service); - CloseableIHMSHandler baseHandler = new FederatedHMSHandler(monitoredService, notifyingFederationService, + FederatedHMSHandler baseHandler = new FederatedHMSHandler(monitoredService, notifyingFederationService, waggleDanceConfiguration); HiveConf conf = new HiveConf(hiveConf); baseHandler.setConf(conf); diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java index 05c7797a3..7fe93c4d5 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java @@ -25,6 +25,8 @@ package com.hotels.bdp.waggledance.server; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -40,6 +42,7 @@ import org.apache.hadoop.hive.common.auth.HiveAuthUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive; import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; import org.apache.hadoop.hive.shims.ShimLoader; @@ -83,21 +86,24 @@ public class MetaStoreProxyServer implements ApplicationRunner { private final HiveConf hiveConf; private final WaggleDanceConfiguration waggleDanceConfiguration; - private final TProcessorFactory tProcessorFactory; + private final TSetIpAddressProcessorFactory tProcessorFactory; private final Lock startLock; private final Condition startCondition; private TServer tServer; + private static HadoopThriftAuthBridge.Server saslServer; + private static boolean useSasl; @Autowired public MetaStoreProxyServer( HiveConf hiveConf, WaggleDanceConfiguration waggleDanceConfiguration, - TProcessorFactory tProcessorFactory) { + TSetIpAddressProcessorFactory tProcessorFactory) { this.hiveConf = hiveConf; this.waggleDanceConfiguration = waggleDanceConfiguration; this.tProcessorFactory = tProcessorFactory; startLock = new ReentrantLock(); startCondition = startLock.newCondition(); + tProcessorFactory.setMetaStoreProxyServer(this); } private boolean isRunning() { @@ -162,7 +168,7 @@ private void startWaggleDance( boolean tcpKeepAlive = hiveConf.getBoolVar(ConfVars.METASTORE_TCP_KEEP_ALIVE); boolean useFramedTransport = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT); boolean useSSL = hiveConf.getBoolVar(ConfVars.HIVE_METASTORE_USE_SSL); - boolean useSASL = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL); + useSasl = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL); TServerSocket serverSocket = createServerSocket(useSSL, waggleDanceConfiguration.getPort()); @@ -170,15 +176,13 @@ private void startWaggleDance( serverSocket = new TServerSocketKeepAlive(serverSocket); } - HadoopThriftAuthBridge.Server saslServer = null; - - if(useSASL) { + if(useSasl) { UserGroupInformation.setConfiguration(hiveConf); saslServer = SaslHelper.createSaslServer(hiveConf); } - TTransportFactory transFactory = createTTransportFactory(useFramedTransport, useSASL, saslServer); - TProcessorFactory tProcessorFactory = getTProcessorFactory(useSASL, saslServer); + TTransportFactory transFactory = createTTransportFactory(useFramedTransport, useSasl, saslServer); + TProcessorFactory tProcessorFactory = getTProcessorFactory(useSasl, saslServer); log.info("Starting WaggleDance Server"); TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket) @@ -192,7 +196,7 @@ private void startWaggleDance( .requestTimeoutUnit(waggleDanceConfiguration.getThriftServerRequestTimeoutUnit()); tServer = new TThreadPoolServer(args); - if (useSASL){ + if (useSasl){ TServerEventHandler tServerEventHandler = new TServerEventHandler() { @Override public void preServe() { @@ -332,4 +336,23 @@ public void waitUntilStarted(int retries, long waitDelay, TimeUnit waitDelayTime } } + static String getIPAddress() { + if (useSasl) { + if (saslServer != null && saslServer.getRemoteAddress() != null) { + return saslServer.getRemoteAddress().getHostAddress(); + } + } else { + // if kerberos is not enabled + Method method = null; + try { + method = HMSHandler.class.getMethod("getThreadLocalIpAddress", null); + return (String) method.invoke(null, null); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + // return HMSHandler.getThreadLocalIpAddress(); + } + return null; + } + } diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TSetIpAddressProcessorFactory.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TSetIpAddressProcessorFactory.java index 63b1fe2bc..62d02080c 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TSetIpAddressProcessorFactory.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TSetIpAddressProcessorFactory.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2023 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,7 @@ class TSetIpAddressProcessorFactory extends TProcessorFactory { private final HiveConf hiveConf; private final FederatedHMSHandlerFactory federatedHMSHandlerFactory; private final TTransportMonitor transportMonitor; + private MetaStoreProxyServer metaStoreProxyServer; @Autowired public TSetIpAddressProcessorFactory( @@ -56,7 +57,8 @@ public TProcessor getProcessor(TTransport transport) { Socket socket = ((TSocket) transport).getSocket(); log.debug("Received a connection from ip: {}", socket.getInetAddress().getHostAddress()); } - CloseableIHMSHandler baseHandler = federatedHMSHandlerFactory.create(); + FederatedHMSHandler baseHandler = federatedHMSHandlerFactory.create(); + baseHandler.setMetaStoreProxyServer(metaStoreProxyServer); boolean useSASL = hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL); if (useSASL) { @@ -80,4 +82,8 @@ private IHMSHandler newRetryingHMSHandler(IHMSHandler baseHandler, HiveConf hive return RetryingHMSHandler.getProxy(hiveConf, baseHandler, local); } + public void setMetaStoreProxyServer( + MetaStoreProxyServer metaStoreProxyServer) { + this.metaStoreProxyServer = metaStoreProxyServer; + } } diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TokenWrappingHMSHandler.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TokenWrappingHMSHandler.java index 4f0f7c06f..723b1dda5 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TokenWrappingHMSHandler.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TokenWrappingHMSHandler.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2023 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ import java.lang.reflect.UndeclaredThrowableException; import org.apache.hadoop.hive.metastore.IHMSHandler; -import org.apache.hadoop.security.UserGroupInformation; import lombok.extern.log4j.Log4j2; @@ -63,31 +62,31 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl // We will get the token when proxy user call in the first time. // Login user must open connect in `TProcessorFactorySaslDecorator#getProcessor` // so we can reuse this connect to get proxy user delegation token - if (useSasl) { - UserGroupInformation currUser = null; - String token = null; - // if call get_delegation_token , will call it directly and set token to threadlocal - - switch (method.getName()) { - case "get_delegation_token": - token = (String) method.invoke(baseHandler, args); - tokens.set(token); - return token; - case "close": - tokens.remove(); - return method.invoke(baseHandler, args); - default: - if (tokens.get().isEmpty() && (currUser = UserGroupInformation.getCurrentUser()) - != UserGroupInformation.getLoginUser()) { - - String shortName = currUser.getShortUserName(); - token = baseHandler.get_delegation_token(shortName, shortName); - log.info("get delegation token by user {}", shortName); - tokens.set(token); - } - return method.invoke(baseHandler, args); - } - } +// if (useSasl) { +// UserGroupInformation currUser = null; +// String token = null; +// // if call get_delegation_token , will call it directly and set token to threadlocal +// +// switch (method.getName()) { +// case "get_delegation_token": +// token = (String) method.invoke(baseHandler, args); +// tokens.set(token); +// return token; +// case "close": +// tokens.remove(); +// return method.invoke(baseHandler, args); +// default: +// if (tokens.get().isEmpty() && (currUser = UserGroupInformation.getCurrentUser()) +// != UserGroupInformation.getLoginUser()) { +// +// String shortName = currUser.getShortUserName(); +// token = baseHandler.get_delegation_token(shortName, shortName); +// log.info("get delegation token by user {}", shortName); +// tokens.set(token); +// } +// return method.invoke(baseHandler, args); +// } +// } return method.invoke(baseHandler, args); } catch (InvocationTargetException e) { // Need to unwrap this, so callers get the correct exception thrown by the handler. diff --git a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactoryTest.java b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactoryTest.java index b1e1984ee..23c5633f7 100644 --- a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactoryTest.java +++ b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactoryTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2021 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerTest.java b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerTest.java index c7b2029d0..81fba804e 100644 --- a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerTest.java +++ b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2023 Expedia, Inc. + * Copyright (C) 2016-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.