Skip to content

Commit

Permalink
Fix/concurrency problem in deque local (#1203)
Browse files Browse the repository at this point in the history
* add exemptLabels for stale bot

* upgrade rpc version 5.8.5-SNAPSHOT

* avoid concurrency problem in DEQUE_LOCAL

* avoid concurrency problem in DEQUE_LOCAL

Co-authored-by: liujianjun.ljj <liujianjun.ljj@antgroup.com>
  • Loading branch information
EvenLjj and liujianjun.ljj authored May 16, 2022
1 parent d107d22 commit 55c5446
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class RpcInternalContext implements Cloneable {
/**
* The constant DEQUE_LOCAL.
*/
private static final ThreadLocal<Deque<RpcInternalContext>> DEQUE_LOCAL = new TransmittableThreadLocal<Deque<RpcInternalContext>>();
private static final ThreadLocal<Deque<RpcInternalContext>> DEQUE_LOCAL = new ThreadLocal<Deque<RpcInternalContext>>();

/**
* 设置上下文
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -226,6 +232,46 @@ public Object call() throws Exception {
Assert.assertEquals(null, task1.get()[0]);
Assert.assertEquals("TransmittableThreadLocal-value-set-in-parent", task1.get()[1]);
}
}

@Test
public void testConcurrentCall() throws ExecutionException, InterruptedException {
for (int i = 0; i < 20; i++) {
testMultiThreadCall();
}
}

private void testMultiThreadCall() throws ExecutionException, InterruptedException {
RpcInternalContext.getContext();
RpcInternalContext.pushContext();
ExecutorService newThreadPool = new ThreadPoolExecutor(5, 10,
10L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>());
List<Future<String>> futureList = new ArrayList<>();
for(int i=0; i<20 ;i++){
Future<String> future = newThreadPool.submit(new Callable() {
@Override
public Object call() throws Exception {
try {
RpcInternalContext.getContext();
RpcInternalContext.pushContext();
RpcInternalContext.removeContext();
RpcInternalContext.popContext();
return null;
} catch (NoSuchElementException e) {
Assert.fail();
}
return null;
}
});
futureList.add(future);
}

for (Future future: futureList){
future.get();
}
RpcInvokeContext.removeContext();
RpcInternalContext.popContext();
RpcInternalContext.removeAllContext();
}
}

0 comments on commit 55c5446

Please sign in to comment.