Skip to content

Commit

Permalink
New threadLocal provides more performance. (#1745)
Browse files Browse the repository at this point in the history
* SerializerFactory 获取Serializer时,锁住整个hashmap,导致整个过程被block

* 单元测试。保证一个class只有一个serializer和deserializer。单线程和多线程测试

* 增加线程数 50 模拟多个线程来获取serializer和deserializer

* 当cores线程数全都使用的情况下,默认线程池会把任务放入到队列中。队列满则再创建线程(总数不会超过Max线程数)
增强线程池:在请求量阶段性出现高峰时使用
特性:cores线程全部使用的情况下,优先创建线程(总数不会超过max),当max个线程全都在忙的情况下,才将任务放入队列。请求量下降时,线程池会自动维持cores个线程,多余的线程退出。

* 当cores线程数全都使用的情况下,默认线程池会把任务放入到队列中。队列满则再创建线程(总数不会超过Max线程数)
增强线程池:在请求量阶段性出现高峰时使用
特性:cores线程全部使用的情况下,优先创建线程(总数不会超过max),当max个线程全都在忙的情况下,才将任务放入队列。请求量下降时,线程池会自动维持cores个线程,多余的线程退出。

* 补全单元测试,测试扩展是否生效

* 错误命名

* 增加@OverRide注解
long 初始化赋值时,小写l改为大写L防止误读

* 修复单元测试

* remove enhanced

* remove enhanced

* Faster ThreadLocal impl in internal use
* Used in RpcContext`s LOCAL field.
* Faster get than the traditional ThreadLocal

* add License

* fix ci failed

* fix ci failed

* fix ci failed

* fix ci failed

* fix ci failed

* remove author info

* fix destroy method

* fix bug at method size.
  • Loading branch information
carryxyh authored and beiwei30 committed May 15, 2018
1 parent 6f3fcab commit 04eacfe
Show file tree
Hide file tree
Showing 11 changed files with 678 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.dubbo.common.threadlocal;

/**
* InternalThread
*/
public class InternalThread extends Thread {

private InternalThreadLocalMap threadLocalMap;

public InternalThread() {
}

public InternalThread(Runnable target) {
super(target);
}

public InternalThread(ThreadGroup group, Runnable target) {
super(group, target);
}

public InternalThread(String name) {
super(name);
}

public InternalThread(ThreadGroup group, String name) {
super(group, name);
}

public InternalThread(Runnable target, String name) {
super(target, name);
}

public InternalThread(ThreadGroup group, Runnable target, String name) {
super(group, target, name);
}

public InternalThread(ThreadGroup group, Runnable target, String name, long stackSize) {
super(group, target, name, stackSize);
}

/**
* Returns the internal data structure that keeps the threadLocal variables bound to this thread.
* Note that this method is for internal use only, and thus is subject to change at any time.
*/
public final InternalThreadLocalMap threadLocalMap() {
return threadLocalMap;
}

/**
* Sets the internal data structure that keeps the threadLocal variables bound to this thread.
* Note that this method is for internal use only, and thus is subject to change at any time.
*/
public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
this.threadLocalMap = threadLocalMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.dubbo.common.threadlocal;

import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;

/**
* InternalThreadLocal
* A special variant of {@link ThreadLocal} that yields higher access performance when accessed from a
* {@link InternalThread}.
* <p></p>
* Internally, a {@link InternalThread} uses a constant index in an array, instead of using hash code and hash table,
* to look for a variable. Although seemingly very subtle, it yields slight performance advantage over using a hash
* table, and it is useful when accessed frequently.
* <p></p>
* This design is learning from {@see io.netty.util.concurrent.FastThreadLocal} which is in Netty.
*/
public class InternalThreadLocal<V> {

private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();

private final int index;

public InternalThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}

/**
* Removes all {@link InternalThreadLocal} variables bound to the current thread. This operation is useful when you
* are in a container environment, and you don't want to leave the thread local variables in the threads you do not
* manage.
*/
@SuppressWarnings("unchecked")
public static void removeAll() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}

try {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
Set<InternalThreadLocal<?>> variablesToRemove = (Set<InternalThreadLocal<?>>) v;
for (InternalThreadLocal<?> tlv : variablesToRemove) {
tlv.remove(threadLocalMap);
}
}
} finally {
InternalThreadLocalMap.remove();
}
}

/**
* Returns the number of thread local variables bound to the current thread.
*/
public static int size() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return 0;
} else {
return threadLocalMap.size();
}
}

public static void destroy() {
InternalThreadLocalMap.destroy();
}

@SuppressWarnings("unchecked")
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, InternalThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<InternalThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<InternalThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<InternalThreadLocal<?>>) v;
}

variablesToRemove.add(variable);
}

@SuppressWarnings("unchecked")
private static void removeFromVariablesToRemove(InternalThreadLocalMap threadLocalMap, InternalThreadLocal<?> variable) {

Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);

if (v == InternalThreadLocalMap.UNSET || v == null) {
return;
}

Set<InternalThreadLocal<?>> variablesToRemove = (Set<InternalThreadLocal<?>>) v;
variablesToRemove.remove(variable);
}

/**
* Returns the current value for the current thread
*/
@SuppressWarnings("unchecked")
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}

return initialize(threadLocalMap);
}

private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
v = initialValue();
} catch (Exception e) {
throw new RuntimeException(e);
}

threadLocalMap.setIndexedVariable(index, v);
addToVariablesToRemove(threadLocalMap, this);
return v;
}

/**
* Sets the value for the current thread.
*/
public final void set(V value) {
if (value == null || value == InternalThreadLocalMap.UNSET) {
remove();
} else {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
if (threadLocalMap.setIndexedVariable(index, value)) {
addToVariablesToRemove(threadLocalMap, this);
}
}
}

/**
* Sets the value to uninitialized; a proceeding call to get() will trigger a call to initialValue().
*/
@SuppressWarnings("unchecked")
public final void remove() {
remove(InternalThreadLocalMap.getIfSet());
}

/**
* Sets the value to uninitialized for the specified thread local map;
* a proceeding call to get() will trigger a call to initialValue().
* The specified thread local map must be for the current thread.
*/
@SuppressWarnings("unchecked")
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}

Object v = threadLocalMap.removeIndexedVariable(index);
removeFromVariablesToRemove(threadLocalMap, this);

if (v != InternalThreadLocalMap.UNSET) {
try {
onRemoval((V) v);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

/**
* Returns the initial value for this thread-local variable.
*/
protected V initialValue() throws Exception {
return null;
}

/**
* Invoked when this thread local variable is removed by {@link #remove()}.
*/
protected void onRemoval(@SuppressWarnings("unused") V value) throws Exception {
}
}
Loading

1 comment on commit 04eacfe

@CodeIngL
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to netty。

Please sign in to comment.