Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async optimization #3738

Merged
merged 41 commits into from
Apr 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
6d76d13
Result implement CF
chickenlj Mar 1, 2019
f5c87b8
Result implement CF
chickenlj Mar 4, 2019
588dc7a
Result implement CF
chickenlj Mar 5, 2019
765f96f
Add AsyncRpcResult
chickenlj Mar 6, 2019
ff4267f
Fix bugs and refactor Filter
chickenlj Mar 7, 2019
ecc4737
Try to add onSend onError for Filter
chickenlj Mar 13, 2019
7a21ddd
invoke different filter method according to result status.
chickenlj Mar 19, 2019
5224a42
make generic work with async call, including add $invokeAsync
chickenlj Mar 14, 2019
475884f
refactor legacy Filter implementation to work with onResponse.
chickenlj Mar 14, 2019
30e12a4
demo changes
chickenlj Mar 15, 2019
940efe5
Fixes #3620, provider attachment lose on consumer side, fix this by r…
chickenlj Mar 19, 2019
fbdaeb3
AsyncRpcResult should always holds an Invocation instance
chickenlj Mar 19, 2019
34887ed
refactor filter signature
chickenlj Mar 19, 2019
3c2ffc8
reimplement embedded Filters
chickenlj Mar 20, 2019
97bb764
Merge branch '3.x-dev' into AsyncRpcResult-Two
chickenlj Mar 21, 2019
725bf83
use ProviderModel modification in 3.x
chickenlj Mar 21, 2019
e62fad5
Fix address notification processing workflow after merging 3.x branch
chickenlj Mar 21, 2019
f9d7288
Fix UT
chickenlj Mar 21, 2019
47e1d45
Fix UT
chickenlj Mar 21, 2019
c1be6c6
Unit test of JValidator; Clean code of JValidator (#3723)
tswstarplanet Mar 25, 2019
ad72159
Fixes #3625 (#3730)
John-Smile Mar 26, 2019
bba1d1a
Merge branch 'master' into AsyncRpcResult-Two
chickenlj Mar 26, 2019
da7ee7c
Fix conflict when merging master and 3.x
chickenlj Mar 26, 2019
4dd6683
Fix conflict when merging master and 3.x
chickenlj Mar 26, 2019
0b168ba
Merge branch '3.x-dev' into AsyncRpcResult-Two
chickenlj Mar 26, 2019
eca70ac
Result interface itself has Future status.
chickenlj Mar 26, 2019
1a2d82b
Fix DefaultFuture UT
chickenlj Mar 27, 2019
2088ae7
Wrap all protocol Invoker with AsyncToSyncInvoker & Fix UT
chickenlj Mar 31, 2019
a5231fe
Add license
chickenlj Mar 31, 2019
610a8a4
fix UT
chickenlj Mar 31, 2019
ce96d9b
Fix ut in MonitorFilterTest
chickenlj Apr 1, 2019
abe997a
avoid duplicate async to sync wrapper
chickenlj Apr 1, 2019
9455c93
return async result in CacheFilter.
chickenlj Apr 1, 2019
ada0050
fix UT in CacheFilterTest
chickenlj Apr 1, 2019
0c6ca2c
Add generic condition check to GenericFilter callback.
chickenlj Apr 9, 2019
9f1d6ff
Fix UT
chickenlj Apr 9, 2019
3b08204
Merge branch '3.x-dev' into AsyncRpcResult-Two
chickenlj Apr 9, 2019
133c265
Get generic from RpcContext if the value in Invocation is empty.
chickenlj Apr 9, 2019
e6a7c0b
Fix RSocketProtocol to meet AbstractProtocol adjustment
chickenlj Apr 9, 2019
ec3b556
rename RpcResult to AppResponse to help avoid confusion with AsyncRpc…
chickenlj Apr 10, 2019
35b17de
RSocket module switch to AsyncRpcResult
chickenlj Apr 10, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.dubbo.common.timer.Timer;
import org.apache.dubbo.common.timer.TimerTask;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;

Expand Down Expand Up @@ -99,7 +99,7 @@ protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, Load
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
addFailed(loadbalance, invocation, invokers, invoker);
return new RpcResult(); // ignore
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;

Expand Down Expand Up @@ -50,7 +50,7 @@ public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBal
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
return new RpcResult(); // ignore
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.concurrent.atomic.AtomicInteger;

/**
* NOTICE! This implementation does not work well with async call.
*
* Invoke a specific number of invokers concurrently, usually used for demanding real-time operations, but need to waste more service resources.
*
* <a href="http://en.wikipedia.org/wiki/Fork_(topology)">Fork</a>
Expand Down Expand Up @@ -66,7 +68,6 @@ public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, L
} else {
selected = new ArrayList<>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {
//Avoid add the same invoker several times.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.Merger;
Expand All @@ -41,12 +41,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* NOTICE! Does not work with async call.
* @param <T>
*/
@SuppressWarnings("unchecked")
public class MergeableClusterInvoker<T> extends AbstractClusterInvoker<T> {

Expand Down Expand Up @@ -86,26 +87,19 @@ protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, Load
returnType = null;
}

Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
Map<String, Result> results = new HashMap<>();
for (final Invoker<T> invoker : invokers) {
Future<Result> future = executor.submit(new Callable<Result>() {
@Override
public Result call() throws Exception {
return invoker.invoke(new RpcInvocation(invocation, invoker));
}
});
results.put(invoker.getUrl().getServiceKey(), future);
results.put(invoker.getUrl().getServiceKey(), invoker.invoke(new RpcInvocation(invocation, invoker)));
}

Object result = null;

List<Result> resultList = new ArrayList<Result>(results.size());

int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
Future<Result> future = entry.getValue();
for (Map.Entry<String, Result> entry : results.entrySet()) {
Result asyncResult = entry.getValue();
try {
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
Result r = asyncResult.get();
if (r.hasException()) {
log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) +
" failed: " + r.getException().getMessage(),
Expand All @@ -119,13 +113,13 @@ public Result call() throws Exception {
}

if (resultList.isEmpty()) {
return new RpcResult((Object) null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else if (resultList.size() == 1) {
return resultList.iterator().next();
}

if (returnType == void.class) {
return new RpcResult((Object) null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
}

if (merger.startsWith(".")) {
Expand Down Expand Up @@ -173,7 +167,7 @@ public Result call() throws Exception {
throw new RpcException("There is no merger to merge result.");
}
}
return new RpcResult(result);
return AsyncRpcResult.newDefaultAsyncResult(result, invocation);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.support.MockInvoker;

Expand Down Expand Up @@ -113,7 +113,7 @@ private Result doMockInvoke(Invocation invocation, RpcException e) {
result = minvoker.invoke(invocation);
} catch (RpcException me) {
if (me.isBiz()) {
result = new RpcResult(me.getCause());
result = AsyncRpcResult.newDefaultAsyncResult(me.getCause(), invocation);
} else {
throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;

import org.junit.jupiter.api.Assertions;
Expand All @@ -48,7 +48,7 @@ public class StickyTest {
private Invoker<StickyTest> invoker2 = mock(Invoker.class);
private RpcInvocation invocation;
private Directory<StickyTest> dic;
private Result result = new RpcResult();
private Result result = new AppResponse();
private StickyClusterInvoker<StickyTest> clusterinvoker = null;
private URL url = URL.valueOf("test://test:11/test?"
+ "&loadbalance=roundrobin"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ public Map<String, String> getAttachments() {
return attachments;
}

@Override
public void setAttachment(String key, String value) {

}

@Override
public void setAttachmentIfAbsent(String key, String value) {

}

public Invoker<?> getInvoker() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.RouterFactory;
Expand Down Expand Up @@ -52,7 +52,7 @@ public class FileRouterEngineTest {
Invoker<FileRouterEngineTest> invoker2 = mock(Invoker.class);
Invocation invocation;
StaticDirectory<FileRouterEngineTest> dic;
Result result = new RpcResult();
Result result = new AppResponse();
private RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();

@BeforeAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.LogUtil;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.filter.DemoService;

Expand All @@ -48,7 +48,7 @@ public class FailSafeClusterInvokerTest {
Invoker<DemoService> invoker = mock(Invoker.class);
RpcInvocation invocation = new RpcInvocation();
Directory<DemoService> dic;
Result result = new RpcResult();
Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.DubboAppender;
import org.apache.dubbo.common.utils.LogUtil;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;

import org.apache.log4j.Level;
Expand Down Expand Up @@ -59,7 +59,7 @@ public class FailbackClusterInvokerTest {
Invoker<FailbackClusterInvokerTest> invoker = mock(Invoker.class);
RpcInvocation invocation = new RpcInvocation();
Directory<FailbackClusterInvokerTest> dic;
Result result = new RpcResult();
Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;

import org.junit.jupiter.api.Assertions;
Expand All @@ -47,7 +47,7 @@ public class FailfastClusterInvokerTest {
Invoker<FailfastClusterInvokerTest> invoker1 = mock(Invoker.class);
RpcInvocation invocation = new RpcInvocation();
Directory<FailfastClusterInvokerTest> dic;
Result result = new RpcResult();
Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class FailoverClusterInvokerTest {
private Invoker<FailoverClusterInvokerTest> invoker2 = mock(Invoker.class);
private RpcInvocation invocation = new RpcInvocation();
private Directory<FailoverClusterInvokerTest> dic;
private Result result = new RpcResult();
private Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.cluster.Directory;

import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -50,7 +50,7 @@ public class ForkingClusterInvokerTest {
private Invoker<ForkingClusterInvokerTest> invoker3 = mock(Invoker.class);
private RpcInvocation invocation = new RpcInvocation();
private Directory<ForkingClusterInvokerTest> dic;
private Result result = new RpcResult();
private Result result = new AppResponse();

@BeforeEach
public void setUp() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;

import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -119,7 +120,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
return MenuService.class;
}
if ("invoke".equals(method.getName())) {
return new RpcResult(firstMenu);
return AsyncRpcResult.newDefaultAsyncResult(firstMenu, invocation);
}
return null;
}
Expand All @@ -135,7 +136,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
return MenuService.class;
}
if ("invoke".equals(method.getName())) {
return new RpcResult(secondMenu);
return AsyncRpcResult.newDefaultAsyncResult(secondMenu, invocation);
}
return null;
}
Expand Down Expand Up @@ -195,14 +196,14 @@ public void testAddMenu() throws Exception {
given(firstInvoker.getUrl()).willReturn(
url.addParameter(Constants.GROUP_KEY, "first"));
given(firstInvoker.getInterface()).willReturn(MenuService.class);
given(firstInvoker.invoke(invocation)).willReturn(new RpcResult())
given(firstInvoker.invoke(invocation)).willReturn(new AppResponse())
;
given(firstInvoker.isAvailable()).willReturn(true);

given(secondInvoker.getUrl()).willReturn(
url.addParameter(Constants.GROUP_KEY, "second"));
given(secondInvoker.getInterface()).willReturn(MenuService.class);
given(secondInvoker.invoke(invocation)).willReturn(new RpcResult())
given(secondInvoker.invoke(invocation)).willReturn(new AppResponse())
;
given(secondInvoker.isAvailable()).willReturn(true);

Expand Down
Loading