Skip to content

Commit

Permalink
Qos heart (apache#3170)
Browse files Browse the repository at this point in the history
* qos heart question fix apache#3165

* modify

* judge if it's a IdleStateEvent

* add UT

* modify
  • Loading branch information
CrazyHZM authored and khanimteyaz committed Jan 18, 2019
1 parent e07735b commit ad7497e
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledFuture;

public class ExecutorUtil {
private static final Logger logger = LoggerFactory.getLogger(ExecutorUtil.class);
Expand All @@ -45,9 +46,10 @@ public static boolean isTerminated(Executor executor) {

/**
* Use the shutdown pattern from:
* https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
* https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
*
* @param executor the Executor to shutdown
* @param timeout the timeout in milliseconds before termination
* @param timeout the timeout in milliseconds before termination
*/
public static void gracefulShutdown(Executor executor, int timeout) {
if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
Expand Down Expand Up @@ -131,4 +133,11 @@ public static URL setThreadName(URL url, String defaultName) {
url = url.addParameter(Constants.THREAD_NAME_KEY, name);
return url;
}

public static void cancelScheduledFuture(ScheduledFuture<?> scheduledFuture) {
ScheduledFuture<?> future = scheduledFuture;
if (future != null && !future.isCancelled()) {
future.cancel(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.monitor.Monitor;
import org.apache.dubbo.monitor.MonitorService;
Expand Down Expand Up @@ -209,7 +210,7 @@ public boolean isAvailable() {
@Override
public void destroy() {
try {
sendFuture.cancel(true);
ExecutorUtil.cancelScheduledFuture(sendFuture);
} catch (Throwable t) {
logger.error("Unexpected error occur at cancel sender timer, cause: " + t.getMessage(), t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.ScheduledFuture;
import org.apache.dubbo.common.utils.ExecutorUtil;

import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -93,6 +95,14 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
}
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ExecutorUtil.cancelScheduledFuture(welcomeFuture);
ctx.close();
}
}

// G for GET, and P for POST
private static boolean isHttp(int magic) {
return magic == 'G' || magic == 'P';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

/**
* DubboRegistry
*
*/
public class DubboRegistry extends FailbackRegistry {

Expand Down Expand Up @@ -124,9 +123,7 @@ public void destroy() {
super.destroy();
try {
// Cancel the reconnection timer
if (!reconnectFuture.isCancelled()) {
reconnectFuture.cancel(true);
}
ExecutorUtil.cancelScheduledFuture(reconnectFuture);
} catch (Throwable t) {
logger.warn("Failed to cancel reconnect timer", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,7 @@ public boolean isAvailable() {
public void destroy() {
super.destroy();
try {
if (cleanFuture != null) {
cleanFuture.cancel(true);
}
ExecutorUtil.cancelScheduledFuture(cleanFuture);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
Expand Down Expand Up @@ -342,8 +340,8 @@ protected void unregistered(URL url) {
if (urls != null) {
urls.remove(url);
}
if (urls == null || urls.isEmpty()){
if (urls == null){
if (urls == null || urls.isEmpty()) {
if (urls == null) {
urls = new ConcurrentHashSet<URL>();
}
URL empty = url.setProtocol(Constants.EMPTY_PROTOCOL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.dubbo.remoting.p2p.exchange.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.IOUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.NetUtils;
Expand Down Expand Up @@ -70,9 +71,7 @@ public void run() {
public void close() {
super.close();
try {
if (!checkModifiedFuture.isCancelled()) {
checkModifiedFuture.cancel(true);
}
ExecutorUtil.cancelScheduledFuture(checkModifiedFuture);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.dubbo.remoting.p2p.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.IOUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.NetUtils;
Expand Down Expand Up @@ -67,9 +68,7 @@ public void run() {
public void close() {
super.close();
try {
if (!checkModifiedFuture.isCancelled()) {
checkModifiedFuture.cancel(true);
}
ExecutorUtil.cancelScheduledFuture(checkModifiedFuture);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
Expand Down

0 comments on commit ad7497e

Please sign in to comment.