Skip to content

Commit

Permalink
code optimization (#3297)
Browse files Browse the repository at this point in the history
  • Loading branch information
CrazyHZM authored and beiwei30 committed Jan 22, 2019
1 parent 0f86000 commit 215ed36
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public class Constants {

public static final int DEFAULT_FAILBACK_TIMES = 3;

public static final int MAX_PROXY_COUNT = 65535;

// default buffer size is 8k.
public static final int DEFAULT_BUFFER_SIZE = 8 * 1024;

Expand Down Expand Up @@ -480,16 +482,19 @@ public class Constants {

/**
* simple the registry for provider.
*
* @since 2.7.0
*/
public static final String SIMPLE_PROVIDER_CONFIG_KEY = "simple.provider.config";
/**
* simple the registry for consumer.
*
* @since 2.7.0
*/
public static final String SIMPLE_CONSUMER_CONFIG_KEY = "simple.consumer.config";
/**
* After simplify the registry, should add some parameter individually for provider.
*
* @since 2.7.0
*/
public static final String EXTRA_PROVIDER_CONFIG_KEYS_KEY = "extra.provider.keys";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.dubbo.common.bytecode;

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.utils.ClassHelper;
import org.apache.dubbo.common.utils.ReflectUtils;

Expand Down Expand Up @@ -77,7 +78,7 @@ public static Proxy getProxy(Class<?>... ics) {
* @return Proxy instance.
*/
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
if (ics.length > 65535) {
if (ics.length > Constants.MAX_PROXY_COUNT) {
throw new IllegalArgumentException("interface limit exceeded");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.lang.reflect.Proxy;

/**
* JavaassistRpcProxyFactory
* JdkRpcProxyFactory
*/
public class JdkProxyFactory extends AbstractProxyFactory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,61 +73,25 @@ public void encode(Channel channel, OutputStream output, Object message) throws
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);

byte flag = in.readByte();
switch (flag) {
case DubboCodec.RESPONSE_NULL_VALUE:
break;
case DubboCodec.RESPONSE_VALUE:
try {
Type[] returnType = RpcUtils.getReturnTypes(invocation);
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
setResponseResult(in, true, false, false);
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION:
try {
Object obj = in.readObject();
if (obj instanceof Throwable == false) {
throw new IOException("Response data error, expect Throwable, but get " + obj);
}
setException((Throwable) obj);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
setResponseResult(in, false, true, false);
break;
case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
try {
setAttachments((Map<String, String>) in.readObject(Map.class));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
setResponseResult(in, false, false, true);
break;
case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
try {
Type[] returnType = RpcUtils.getReturnTypes(invocation);
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
setAttachments((Map<String, String>) in.readObject(Map.class));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
setResponseResult(in, true, false, true);
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
try {
Object obj = in.readObject();
if (obj instanceof Throwable == false) {
throw new IOException("Response data error, expect Throwable, but get " + obj);
}
setException((Throwable) obj);
setAttachments((Map<String, String>) in.readObject(Map.class));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
setResponseResult(in, false, true, false);
break;
default:
throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
Expand Down Expand Up @@ -155,4 +119,28 @@ public void decode() throws Exception {
}
}

private void setResponseResult(ObjectInput in, boolean hasValue, boolean hasException, boolean hasAttachments) throws IOException {
try {
if (hasValue) {
Type[] returnType = RpcUtils.getReturnTypes(invocation);
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
}
if (hasException) {
Object obj = in.readObject();
if (obj instanceof Throwable == false) {
throw new IOException("Response data error, expect Throwable, but get " + obj);
}
setException((Throwable) obj);
}
if (hasAttachments) {
setAttachments((Map<String, String>) in.readObject(Map.class));
}
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,22 @@ public class DubboProtocol extends AbstractProtocol {
public static final int DEFAULT_PORT = 20880;
private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke";
private static DubboProtocol INSTANCE;
private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); // <host:port,Exchanger>
private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>
private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap = new ConcurrentHashMap<String, LazyConnectExchangeClient>();
private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
private final Set<String> optimizers = new ConcurrentHashSet<String>();
//consumer side export a stub service for dispatching event
//servicekey-stubmethods
private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<String, String>();
/**
* <host:port,Exchanger>
*/
private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<>();
/**
* <host:port,Exchanger>
*/
private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<>();
private final Set<String> optimizers = new ConcurrentHashSet<>();
/**
* consumer side export a stub service for dispatching event
* servicekey-stubmethods
*/
private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<>();
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

@Override
Expand Down Expand Up @@ -180,7 +188,8 @@ public DubboProtocol() {

public static DubboProtocol getDubboProtocol() {
if (INSTANCE == null) {
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(DubboProtocol.NAME); // load
// load
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(DubboProtocol.NAME);
}
return INSTANCE;
}
Expand Down Expand Up @@ -218,15 +227,16 @@ Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException
//callback
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path = inv.getAttachments().get(Constants.PATH_KEY) + "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
path += "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));

DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " +
exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
}

return exporter.getInvoker();
Expand Down Expand Up @@ -447,7 +457,7 @@ private ExchangeClient initClient(URL url) {

@Override
public void destroy() {
for (String key : new ArrayList<String>(serverMap.keySet())) {
for (String key : new ArrayList<>(serverMap.keySet())) {
ExchangeServer server = serverMap.remove(key);
if (server != null) {
try {
Expand All @@ -461,7 +471,7 @@ public void destroy() {
}
}

for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
for (String key : new ArrayList<>(referenceClientMap.keySet())) {
ExchangeClient client = referenceClientMap.remove(key);
if (client != null) {
try {
Expand All @@ -475,7 +485,7 @@ public void destroy() {
}
}

for (String key : new ArrayList<String>(ghostClientMap.keySet())) {
for (String key : new ArrayList<>(ghostClientMap.keySet())) {
ExchangeClient client = ghostClientMap.remove(key);
if (client != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,19 @@
@SuppressWarnings("deprecation")
final class LazyConnectExchangeClient implements ExchangeClient {

// when this warning rises from invocation, program probably have bug.
static final String REQUEST_WITH_WARNING_KEY = "lazyclient_request_with_warning";
/**
* when this warning rises from invocation, program probably have bug.
*/
protected static final String REQUEST_WITH_WARNING_KEY = "lazyclient_request_with_warning";
private final static Logger logger = LoggerFactory.getLogger(LazyConnectExchangeClient.class);
protected final boolean requestWithWarning;
private final URL url;
private final ExchangeHandler requestHandler;
private final Lock connectLock = new ReentrantLock();
// lazy connect, initial state for connection
private final int warning_period = 5000;
/**
* lazy connect, initial state for connection
*/
private final boolean initialState;
private volatile ExchangeClient client;
private AtomicLong warningcount = new AtomicLong(0);
Expand Down Expand Up @@ -81,7 +86,7 @@ private void initClient() throws RemotingException {

@Override
public ResponseFuture request(Object request) throws RemotingException {
warning(request);
warning();
initClient();
return client.request(request);
}
Expand All @@ -102,19 +107,17 @@ public InetSocketAddress getRemoteAddress() {

@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
warning(request);
warning();
initClient();
return client.request(request, timeout);
}

/**
* If {@link #REQUEST_WITH_WARNING_KEY} is configured, then warn once every 5000 invocations.
*
* @param request
*/
private void warning(Object request) {
private void warning() {
if (requestWithWarning) {
if (warningcount.get() % 5000 == 0) {
if (warningcount.get() % warning_period == 0) {
logger.warn(new IllegalStateException("safe guard client , should not be called ,must have a bug."));
}
warningcount.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ public class TraceFilter implements Filter {

private static final String TRACE_COUNT = "trace.count";

private static final ConcurrentMap<String, Set<Channel>> tracers = new ConcurrentHashMap<String, Set<Channel>>();
private static final ConcurrentMap<String, Set<Channel>> tracers = new ConcurrentHashMap<>();

public static void addTracer(Class<?> type, String method, Channel channel, int max) {
channel.setAttribute(TRACE_MAX, max);
channel.setAttribute(TRACE_COUNT, new AtomicInteger());
String key = method != null && method.length() > 0 ? type.getName() + "." + method : type.getName();
Set<Channel> channels = tracers.get(key);
if (channels == null) {
tracers.putIfAbsent(key, new ConcurrentHashSet<Channel>());
tracers.putIfAbsent(key, new ConcurrentHashSet<>());
channels = tracers.get(key);
}
channels.add(channel);
Expand Down Expand Up @@ -87,13 +87,13 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
channels = tracers.get(key);
}
if (CollectionUtils.isNotEmpty(channels)) {
for (Channel channel : new ArrayList<Channel>(channels)) {
for (Channel channel : new ArrayList<>(channels)) {
if (channel.isConnected()) {
try {
int max = 1;
Integer m = (Integer) channel.getAttribute(TRACE_MAX);
if (m != null) {
max = (int) m;
max = m;
}
int count = 0;
AtomicInteger c = (AtomicInteger) channel.getAttribute(TRACE_COUNT);
Expand Down

0 comments on commit 215ed36

Please sign in to comment.