Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modify triple unique id logic #1006

Merged
merged 1 commit into from
Feb 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@
import com.alipay.sofa.rpc.bootstrap.DefaultProviderBootstrap;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.ext.Extension;
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
import com.alipay.sofa.rpc.proxy.ProxyFactory;
import com.alipay.sofa.rpc.server.ProviderProxyInvoker;

import java.lang.reflect.Method;

Expand All @@ -34,8 +30,6 @@
@Extension("tri")
public class TripleProviderBootstrap<T> extends DefaultProviderBootstrap<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(TripleProviderBootstrap.class);

/**
* 构造函数
*
Expand All @@ -45,26 +39,6 @@ protected TripleProviderBootstrap(ProviderConfig<T> providerConfig) {
super(providerConfig);
}

@Override
protected void preProcessProviderTarget(ProviderConfig providerConfig, ProviderProxyInvoker providerProxyInvoker) {
Class<?> implClass = providerConfig.getRef().getClass();
try {
Method method = implClass.getMethod("setProxiedImpl", providerConfig.getProxyClass());
Object obj = ProxyFactory.buildProxy(providerConfig.getProxy(), providerConfig.getProxyClass(),
providerProxyInvoker);
method.invoke(providerConfig.getRef(), obj);
} catch (NoSuchMethodException e) {
LOGGER
.info(
"{} don't hava method setProxiedImpl, will treated as origin provider service instead of grpc service.",
implClass);
} catch (Exception e) {
throw new IllegalStateException(
"Failed to set sofa proxied service impl to stub, please make sure your stub "
+ "was generated by the sofa-protoc-compiler.", e);
}
}

@Override
public void export() {
Class enclosingClass = this.getProviderConfig().getProxyClass().getEnclosingClass();
Expand Down
2 changes: 1 addition & 1 deletion example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<properties>
<protoc.version>3.11.0</protoc.version>
<protoc-gen-grpc-java.version>1.17.0</protoc-gen-grpc-java.version>
<sofa.rpc.compiler.version>0.0.3</sofa.rpc.compiler.version>
<sofa.rpc.compiler.version>0.0.2</sofa.rpc.compiler.version>
<grpc.version>1.28.0</grpc.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,4 @@ public class TripleConstant {
public static final String TRIPLE_EXPOSE_OLD = "triple.use.old.path";
public static final Boolean EXPOSE_OLD_UNIQUE_ID_SERVICE = SofaConfigs
.getOrDefault(RpcConfigKeys.TRIPLE_EXPOSE_OLD_UNIQUE_ID_SERVICE);

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ public class TripleContants {

public static final String SOFA_CONSUMER_CONFIG_KEY = "_SOFA_CONSUMER_CONFIG";

public static final String SOFA_UNIQUE_ID = "_SOFA_UNIQUE_ID";

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.alipay.sofa.rpc.server.triple;

import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
Expand Down Expand Up @@ -54,17 +53,17 @@
import triple.Request;
import triple.Response;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.alipay.sofa.rpc.constant.TripleConstant.EXPOSE_OLD_UNIQUE_ID_SERVICE;
import static com.alipay.sofa.rpc.constant.TripleConstant.TRIPLE_EXPOSE_OLD;
import static com.alipay.sofa.rpc.utils.SofaProtoUtils.getFullNameWithUniqueId;
import static io.grpc.MethodDescriptor.generateFullMethodName;

/**
Expand All @@ -81,12 +80,6 @@ public class TripleServer implements Server {
*/
private static final Logger LOGGER = LoggerFactory
.getLogger(TripleServer.class);
/**
* The mapping relationship between BindableService and ServerServiceDefinition
*/
protected ConcurrentHashMap<ProviderConfig, ServerServiceDefinition> oldServiceInfo = new ConcurrentHashMap<ProviderConfig,
ServerServiceDefinition>();

/**
* server config
*/
Expand All @@ -112,6 +105,10 @@ public class TripleServer implements Server {
*/
protected ConcurrentHashMap<ProviderConfig, ServerServiceDefinition> serviceInfo = new ConcurrentHashMap<ProviderConfig,
ServerServiceDefinition>();
/**
* The mapping relationship between service name and unique id invoker
*/
protected Map<String, UniqueIdInvoker> invokerMap = new ConcurrentHashMap<>();

/**
* invoker count
Expand All @@ -124,6 +121,11 @@ public class TripleServer implements Server {
*/
private ThreadPoolExecutor bizThreadPool;

/**
* lock
*/
private Lock lock;

@Override
public void init(ServerConfig serverConfig) {
this.serverConfig = serverConfig;
Expand All @@ -135,6 +137,7 @@ public void init(ServerConfig serverConfig) {
.executor(bizThreadPool)
.channelType(constructChannel())
.build();
this.lock = new ReentrantLock();
}

private Class<? extends ServerChannel> constructChannel() {
Expand Down Expand Up @@ -238,86 +241,75 @@ public void stop() {
@Override
public void registerProcessor(ProviderConfig providerConfig, Invoker instance) {
Object ref = providerConfig.getRef();
this.lock.lock();
try {
ServerServiceDefinition serviceDef;
// wrap invoker to support unique id
UniqueIdInvoker oldInvoker = this.invokerMap.putIfAbsent(providerConfig.getInterfaceId(), new UniqueIdInvoker());
if (null != oldInvoker) {
// this service has already registered into grpc server,
// we only need register given invoker into unique id invoker.
if (!oldInvoker.registerInvoker(providerConfig, instance)) {
throw new IllegalStateException("Can not expose service with interface:" + providerConfig.getInterfaceId() + " and unique id: " + providerConfig.getUniqueId());
}
return;
}

UniqueIdInvoker uniqueIdInvoker = this.invokerMap.get(providerConfig.getInterfaceId());
if (!uniqueIdInvoker.registerInvoker(providerConfig, instance)) {
throw new IllegalStateException("Can not expose service with interface:" + providerConfig.getInterfaceId() + " and unique id: " + providerConfig.getUniqueId());
}

// create service definition
ServerServiceDefinition serviceDef;
if (SofaProtoUtils.isProtoClass(ref)) {
// refer is BindableService
this.setBindableProxiedImpl(providerConfig, uniqueIdInvoker);
BindableService bindableService = (BindableService) providerConfig.getRef();
serviceDef = bindableService.bindService();

} else {
Object obj = ProxyFactory.buildProxy(providerConfig.getProxy(), providerConfig.getProxyClass(),
instance);
// normal class
Object obj = ProxyFactory.buildProxy(providerConfig.getProxy(), providerConfig.getProxyClass(), uniqueIdInvoker);
GenericServiceImpl genericService = new GenericServiceImpl(obj, providerConfig.getProxyClass());
genericService.setProxiedImpl(genericService);
serviceDef = buildSofaServiceDef(genericService, providerConfig);
}
ServerServiceDefinition oldServiceDef;
oldServiceDef = serviceDef;
if (StringUtils.isNotBlank(providerConfig.getUniqueId())) {
serviceDef = appendUniqueIdToServiceDef(providerConfig.getUniqueId(), serviceDef);
if (exposeOldUniqueIdService(providerConfig)) {
List<TripleServerInterceptor> interceptorList = buildInterceptorChain(oldServiceDef);
ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept(
oldServiceDef, interceptorList);
oldServiceInfo.put(providerConfig, serviceDefinition);
ServerServiceDefinition ssd = handlerRegistry.addService(serviceDefinition);
if (ssd != null) {
throw new IllegalStateException("Can not expose service with same name:" +
serviceDefinition.getServiceDescriptor().getName());
}
}
}

List<TripleServerInterceptor> interceptorList = buildInterceptorChain(serviceDef);
ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept(
serviceDef, interceptorList);
serviceInfo.put(providerConfig, serviceDefinition);
ServerServiceDefinition ssd = handlerRegistry.addService(serviceDefinition);
this.serviceInfo.put(providerConfig, serviceDefinition);
ServerServiceDefinition ssd = this.handlerRegistry.addService(serviceDefinition);
if (ssd != null) {
throw new IllegalStateException("Can not expose service with same name:" +
serviceDefinition.getServiceDescriptor().getName());
}
invokerCnt.incrementAndGet();
this.invokerCnt.incrementAndGet();
} catch (Exception e) {
String msg = "Register triple service error";
LOGGER.error(msg, e);
serviceInfo.remove(providerConfig);
this.serviceInfo.remove(providerConfig);
throw new SofaRpcRuntimeException(msg, e);
}

}

private boolean exposeOldUniqueIdService(ProviderConfig providerConfig) {
//default false
String exposeOld = providerConfig.getParameter(TRIPLE_EXPOSE_OLD);
if (StringUtils.isBlank(exposeOld)) {
return EXPOSE_OLD_UNIQUE_ID_SERVICE;
} else {
return Boolean.parseBoolean(exposeOld);
} finally {
this.lock.unlock();
}
}

private ServerServiceDefinition appendUniqueIdToServiceDef(String uniqueId, ServerServiceDefinition serviceDef) {
final String newServiceName = serviceDef.getServiceDescriptor().getName() + "." + uniqueId;

ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(newServiceName);

Collection<ServerMethodDefinition<?, ?>> methods = serviceDef.getMethods();
for (ServerMethodDefinition method : methods) {
MethodDescriptor<?, ?> methodDescriptor = method.getMethodDescriptor();
String fullMethodName = methodDescriptor.getFullMethodName();
MethodDescriptor<?, ?> methodDescriptorWithUniqueId = methodDescriptor
.toBuilder()
.setFullMethodName(
getFullNameWithUniqueId(fullMethodName, uniqueId))
.build();
ServerMethodDefinition<?, ?> newServerMethodDefinition = ServerMethodDefinition.create(
methodDescriptorWithUniqueId, method.getServerCallHandler());
builder.addMethod(newServerMethodDefinition);
private void setBindableProxiedImpl(ProviderConfig providerConfig, Invoker invoker) {
Class<?> implClass = providerConfig.getRef().getClass();
try {
Method method = implClass.getMethod("setProxiedImpl", providerConfig.getProxyClass());
Object obj = ProxyFactory.buildProxy(providerConfig.getProxy(), providerConfig.getProxyClass(), invoker);
method.invoke(providerConfig.getRef(), obj);
} catch (NoSuchMethodException e) {
LOGGER
.info(
"{} don't hava method setProxiedImpl, will treated as origin provider service instead of grpc service.",
implClass);
} catch (Exception e) {
throw new IllegalStateException(
"Failed to set sofa proxied service impl to stub, please make sure your stub "
+ "was generated by the sofa-protoc-compiler.", e);
}
serviceDef = builder.build();
return serviceDef;
}

private ServerServiceDefinition buildSofaServiceDef(GenericServiceImpl genericService,
Expand All @@ -332,15 +324,12 @@ private ServerServiceDefinition buildSofaServiceDef(GenericServiceImpl genericSe
templateDefinition, providerConfig, methodDescriptor));
for (ServerMethodDefinition<Request, Response> methodDef : methodDefs) {
builder.addMethod(methodDef);

}
return builder.build();

}

private List<ServerMethodDefinition<Request, Response>> getMethodDefinitions(ServerCallHandler<Request, Response> templateHandler,List<MethodDescriptor<Request, Response>> methodDescriptors) {
List<ServerMethodDefinition<Request, Response>> result = new ArrayList<>();

for (MethodDescriptor<Request, Response> methodDescriptor : methodDescriptors) {
ServerMethodDefinition<Request, Response> serverMethodDefinition = ServerMethodDefinition.create(methodDescriptor, templateHandler);
result.add(serverMethodDefinition);
Expand All @@ -360,9 +349,8 @@ private ServiceDescriptor getServiceDescriptor(ServerServiceDefinition template,

}

private List<MethodDescriptor<Request, Response>> getMethodDescriptor( ProviderConfig providerConfig) {
private List<MethodDescriptor<Request, Response>> getMethodDescriptor(ProviderConfig providerConfig) {
List<MethodDescriptor<Request, Response>> result = new ArrayList<>();

Set<String> methodNames = SofaProtoUtils.getMethodNames(providerConfig.getInterfaceId());
for (String name : methodNames) {
MethodDescriptor<Request, Response> methodDescriptor = MethodDescriptor.<Request, Response>newBuilder()
Expand All @@ -376,7 +364,6 @@ private List<MethodDescriptor<Request, Response>> getMethodDescriptor( ProviderC
.build();
result.add(methodDescriptor);
}

return result;
}

Expand All @@ -394,18 +381,24 @@ protected List<TripleServerInterceptor> buildInterceptorChain(ServerServiceDefin

@Override
public void unRegisterProcessor(ProviderConfig providerConfig, boolean closeIfNoEntry) {
this.lock.lock();
try {
ServerServiceDefinition serverServiceDefinition = serviceInfo.get(providerConfig);
if (exposeOldUniqueIdService(providerConfig)) {
ServerServiceDefinition oldServiceDef = oldServiceInfo.get(providerConfig);
if (oldServiceDef != null) {
handlerRegistry.removeService(oldServiceDef);
ServerServiceDefinition serverServiceDefinition = this.serviceInfo.get(providerConfig);
UniqueIdInvoker uniqueIdInvoker = this.invokerMap.get(providerConfig.getInterfaceId());
if (null != uniqueIdInvoker) {
uniqueIdInvoker.unRegisterInvoker(providerConfig);
if (!uniqueIdInvoker.hasInvoker()) {
this.invokerMap.remove(providerConfig.getInterfaceId());
this.handlerRegistry.removeService(serverServiceDefinition);
}
} else {
this.handlerRegistry.removeService(serverServiceDefinition);
}
handlerRegistry.removeService(serverServiceDefinition);
invokerCnt.decrementAndGet();
} catch (Exception e) {
LOGGER.error("Unregister triple service error", e);
} finally {
this.lock.unlock();
}
if (closeIfNoEntry && invokerCnt.get() == 0) {
stop();
Expand Down
Loading