Skip to content

Commit

Permalink
first step one
Browse files Browse the repository at this point in the history
  • Loading branch information
flaming-archer committed Mar 19, 2024
1 parent 1b9a36e commit 2de0e19
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2023 Expedia, Inc.
* Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -167,23 +167,23 @@ private SaslMetastoreClientHander(
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try {
switch (method.getName()) {
case "get_delegation_token":
try {
clientManager.open();
Object token = method.invoke(baseHandler, args);
this.delegationToken = (String) token;
clientManager.close();
setTokenStr2Ugi(UserGroupInformation.getCurrentUser(), (String) token);
clientManager.open();
return token;
} catch (IOException e) {
throw new MetastoreUnavailableException("Couldn't setup delegation token in the ugi: ", e);
}
default:
genToken();
// switch (method.getName()) {
// case "get_delegation_token":
// try {
// clientManager.open();
// Object token = method.invoke(baseHandler, args);
// this.delegationToken = (String) token;
// clientManager.close();
// setTokenStr2Ugi(UserGroupInformation.getCurrentUser(), (String) token);
// clientManager.open();
// return token;
// } catch (IOException e) {
// throw new MetastoreUnavailableException("Couldn't setup delegation token in the ugi: ", e);
// }
// default:
// genToken();
return method.invoke(baseHandler, args);
}
// }
} catch (InvocationTargetException e) {
throw e.getTargetException();
} catch (UndeclaredThrowableException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2023 Expedia, Inc.
* Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -138,6 +138,7 @@ void open(HiveUgiArgs ugiArgs) {
String tokenSig = conf.getVar(ConfVars.METASTORE_TOKEN_SIGNATURE);
// tokenSig could be null
String tokenStrForm = SecurityUtils.getTokenStrForm(tokenSig);
tokenStrForm = null;
if (tokenStrForm != null) {
// authenticate using delegation tokens via the "DIGEST" mechanism
transport = KerberosSaslHelper
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2023 Expedia, Inc.
* Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -245,6 +245,7 @@ class FederatedHMSHandler extends FacebookBase implements CloseableIHMSHandler {
private final NotifyingFederationService notifyingFederationService;
private final WaggleDanceConfiguration waggleDanceConfiguration;
private Configuration conf;
private MetaStoreProxyServer metaStoreProxyServer;

FederatedHMSHandler(
MappingEventListener databaseMappingService,
Expand Down Expand Up @@ -1369,19 +1370,54 @@ public List<String> set_ugi(String user_name, List<String> group_names) throws M
@Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME)
public String get_delegation_token(String token_owner, String renewer_kerberos_principal_name)
throws MetaException, TException {
return getPrimaryClient().get_delegation_token(token_owner, renewer_kerberos_principal_name);
String ret = null;
try {
ret =
HiveMetaStore.getDelegationToken(token_owner,
renewer_kerberos_principal_name, metaStoreProxyServer.getIPAddress());
} catch (IOException | InterruptedException e) {
throw new MetaException(e.getMessage());
}
return ret;
}

@Override
@Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME)
public long renew_delegation_token(String token_str_form) throws MetaException, TException {
return getPrimaryClient().renew_delegation_token(token_str_form);
Long ret = null;
Exception ex = null;
try {
ret = HiveMetaStore.renewDelegationToken(token_str_form);
} catch (IOException e) {
ex = e;
throw new MetaException(e.getMessage());
} catch (Exception e) {
ex = e;
throw newMetaException(e);
}
return ret;
}

@Override
@Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME)
public void cancel_delegation_token(String token_str_form) throws MetaException, TException {
getPrimaryClient().cancel_delegation_token(token_str_form);
boolean success = false;
try {
HiveMetaStore.cancelDelegationToken(token_str_form);
} catch (IOException e) {
throw new MetaException(e.getMessage());
} catch (Exception e) {
throw newMetaException(e);
}
}

private static MetaException newMetaException(Exception e) {
if (e instanceof MetaException) {
return (MetaException)e;
}
MetaException me = new MetaException(e.toString());
me.initCause(e);
return me;
}

@Override
Expand Down Expand Up @@ -1527,6 +1563,11 @@ public void setConf(Configuration conf) {
this.conf = conf;
}

public void setMetaStoreProxyServer(
MetaStoreProxyServer metaStoreProxyServer) {
this.metaStoreProxyServer = metaStoreProxyServer;
}

@Override
public void init() throws MetaException {}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2023 Expedia, Inc.
* Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,11 +53,11 @@ public FederatedHMSHandlerFactory(
this.queryMapping = queryMapping;
}

public CloseableIHMSHandler create() {
public FederatedHMSHandler create() {
MappingEventListener service = createDatabaseMappingService();
MonitoredDatabaseMappingService monitoredService = new MonitoredDatabaseMappingService(service);

CloseableIHMSHandler baseHandler = new FederatedHMSHandler(monitoredService, notifyingFederationService,
FederatedHMSHandler baseHandler = new FederatedHMSHandler(monitoredService, notifyingFederationService,
waggleDanceConfiguration);
HiveConf conf = new HiveConf(hiveConf);
baseHandler.setConf(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
package com.hotels.bdp.waggledance.server;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -40,6 +42,7 @@
import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.shims.ShimLoader;
Expand Down Expand Up @@ -83,21 +86,24 @@ public class MetaStoreProxyServer implements ApplicationRunner {

private final HiveConf hiveConf;
private final WaggleDanceConfiguration waggleDanceConfiguration;
private final TProcessorFactory tProcessorFactory;
private final TSetIpAddressProcessorFactory tProcessorFactory;
private final Lock startLock;
private final Condition startCondition;
private TServer tServer;
private static HadoopThriftAuthBridge.Server saslServer;
private static boolean useSasl;

@Autowired
public MetaStoreProxyServer(
HiveConf hiveConf,
WaggleDanceConfiguration waggleDanceConfiguration,
TProcessorFactory tProcessorFactory) {
TSetIpAddressProcessorFactory tProcessorFactory) {
this.hiveConf = hiveConf;
this.waggleDanceConfiguration = waggleDanceConfiguration;
this.tProcessorFactory = tProcessorFactory;
startLock = new ReentrantLock();
startCondition = startLock.newCondition();
tProcessorFactory.setMetaStoreProxyServer(this);
}

private boolean isRunning() {
Expand Down Expand Up @@ -162,23 +168,21 @@ private void startWaggleDance(
boolean tcpKeepAlive = hiveConf.getBoolVar(ConfVars.METASTORE_TCP_KEEP_ALIVE);
boolean useFramedTransport = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
boolean useSSL = hiveConf.getBoolVar(ConfVars.HIVE_METASTORE_USE_SSL);
boolean useSASL = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
useSasl = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);

TServerSocket serverSocket = createServerSocket(useSSL, waggleDanceConfiguration.getPort());

if (tcpKeepAlive) {
serverSocket = new TServerSocketKeepAlive(serverSocket);
}

HadoopThriftAuthBridge.Server saslServer = null;

if(useSASL) {
if(useSasl) {
UserGroupInformation.setConfiguration(hiveConf);
saslServer = SaslHelper.createSaslServer(hiveConf);
}

TTransportFactory transFactory = createTTransportFactory(useFramedTransport, useSASL, saslServer);
TProcessorFactory tProcessorFactory = getTProcessorFactory(useSASL, saslServer);
TTransportFactory transFactory = createTTransportFactory(useFramedTransport, useSasl, saslServer);
TProcessorFactory tProcessorFactory = getTProcessorFactory(useSasl, saslServer);
log.info("Starting WaggleDance Server");

TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket)
Expand All @@ -192,7 +196,7 @@ private void startWaggleDance(
.requestTimeoutUnit(waggleDanceConfiguration.getThriftServerRequestTimeoutUnit());

tServer = new TThreadPoolServer(args);
if (useSASL){
if (useSasl){
TServerEventHandler tServerEventHandler = new TServerEventHandler() {
@Override
public void preServe() {
Expand Down Expand Up @@ -332,4 +336,23 @@ public void waitUntilStarted(int retries, long waitDelay, TimeUnit waitDelayTime
}
}

static String getIPAddress() {
if (useSasl) {
if (saslServer != null && saslServer.getRemoteAddress() != null) {
return saslServer.getRemoteAddress().getHostAddress();
}
} else {
// if kerberos is not enabled
Method method = null;
try {
method = HMSHandler.class.getMethod("getThreadLocalIpAddress", null);
return (String) method.invoke(null, null);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
// return HMSHandler.getThreadLocalIpAddress();
}
return null;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2023 Expedia, Inc.
* Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,6 +37,7 @@ class TSetIpAddressProcessorFactory extends TProcessorFactory {
private final HiveConf hiveConf;
private final FederatedHMSHandlerFactory federatedHMSHandlerFactory;
private final TTransportMonitor transportMonitor;
private MetaStoreProxyServer metaStoreProxyServer;

@Autowired
public TSetIpAddressProcessorFactory(
Expand All @@ -56,7 +57,8 @@ public TProcessor getProcessor(TTransport transport) {
Socket socket = ((TSocket) transport).getSocket();
log.debug("Received a connection from ip: {}", socket.getInetAddress().getHostAddress());
}
CloseableIHMSHandler baseHandler = federatedHMSHandlerFactory.create();
FederatedHMSHandler baseHandler = federatedHMSHandlerFactory.create();
baseHandler.setMetaStoreProxyServer(metaStoreProxyServer);

boolean useSASL = hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL);
if (useSASL) {
Expand All @@ -80,4 +82,8 @@ private IHMSHandler newRetryingHMSHandler(IHMSHandler baseHandler, HiveConf hive
return RetryingHMSHandler.getProxy(hiveConf, baseHandler, local);
}

public void setMetaStoreProxyServer(
MetaStoreProxyServer metaStoreProxyServer) {
this.metaStoreProxyServer = metaStoreProxyServer;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2023 Expedia, Inc.
* Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,6 @@
import java.lang.reflect.UndeclaredThrowableException;

import org.apache.hadoop.hive.metastore.IHMSHandler;
import org.apache.hadoop.security.UserGroupInformation;

import lombok.extern.log4j.Log4j2;

Expand Down Expand Up @@ -63,31 +62,31 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
// We will get the token when proxy user call in the first time.
// Login user must open connect in `TProcessorFactorySaslDecorator#getProcessor`
// so we can reuse this connect to get proxy user delegation token
if (useSasl) {
UserGroupInformation currUser = null;
String token = null;
// if call get_delegation_token , will call it directly and set token to threadlocal

switch (method.getName()) {
case "get_delegation_token":
token = (String) method.invoke(baseHandler, args);
tokens.set(token);
return token;
case "close":
tokens.remove();
return method.invoke(baseHandler, args);
default:
if (tokens.get().isEmpty() && (currUser = UserGroupInformation.getCurrentUser())
!= UserGroupInformation.getLoginUser()) {

String shortName = currUser.getShortUserName();
token = baseHandler.get_delegation_token(shortName, shortName);
log.info("get delegation token by user {}", shortName);
tokens.set(token);
}
return method.invoke(baseHandler, args);
}
}
// if (useSasl) {
// UserGroupInformation currUser = null;
// String token = null;
// // if call get_delegation_token , will call it directly and set token to threadlocal
//
// switch (method.getName()) {
// case "get_delegation_token":
// token = (String) method.invoke(baseHandler, args);
// tokens.set(token);
// return token;
// case "close":
// tokens.remove();
// return method.invoke(baseHandler, args);
// default:
// if (tokens.get().isEmpty() && (currUser = UserGroupInformation.getCurrentUser())
// != UserGroupInformation.getLoginUser()) {
//
// String shortName = currUser.getShortUserName();
// token = baseHandler.get_delegation_token(shortName, shortName);
// log.info("get delegation token by user {}", shortName);
// tokens.set(token);
// }
// return method.invoke(baseHandler, args);
// }
// }
return method.invoke(baseHandler, args);
} catch (InvocationTargetException e) {
// Need to unwrap this, so callers get the correct exception thrown by the handler.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2021 Expedia, Inc.
* Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2023 Expedia, Inc.
* Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down

0 comments on commit 2de0e19

Please sign in to comment.