diff --git a/extension-impl/remoting-http/src/main/java/com/alipay/sofa/rpc/server/http/AbstractHttpServer.java b/extension-impl/remoting-http/src/main/java/com/alipay/sofa/rpc/server/http/AbstractHttpServer.java index 5b2ec159e..0dd57904e 100644 --- a/extension-impl/remoting-http/src/main/java/com/alipay/sofa/rpc/server/http/AbstractHttpServer.java +++ b/extension-impl/remoting-http/src/main/java/com/alipay/sofa/rpc/server/http/AbstractHttpServer.java @@ -90,7 +90,7 @@ public void init(ServerConfig serverConfig) { // 启动线程池 this.bizThreadPool = initThreadPool(serverConfig); // 服务端处理器 - this.serverHandler = new HttpServerHandler(bizThreadPool); + this.serverHandler = new HttpServerHandler(); // set default transport config this.serverTransportConfig.setContainer(container); @@ -118,6 +118,9 @@ public void start() { return; } try { + // 启动线程池 + this.bizThreadPool = initThreadPool(serverConfig); + this.serverHandler.setBizThreadPool(bizThreadPool); serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig); started = serverTransport.start(); @@ -156,6 +159,14 @@ public void stop() { // 关闭端口,不关闭线程池 serverTransport.stop(); serverTransport = null; + + // 关闭线程池 + if (bizThreadPool != null) { + bizThreadPool.shutdown(); + bizThreadPool = null; + serverHandler.setBizThreadPool(null); + } + started = false; if (EventBus.isEnable(ServerStoppedEvent.class)) { @@ -216,12 +227,6 @@ public void destroy() { } stop(); - - // 关闭线程池 - if (bizThreadPool != null) { - bizThreadPool.shutdown(); - } - serverHandler = null; } diff --git a/extension-impl/remoting-http/src/main/java/com/alipay/sofa/rpc/server/http/HttpServerHandler.java b/extension-impl/remoting-http/src/main/java/com/alipay/sofa/rpc/server/http/HttpServerHandler.java index 1f6025551..0a249dde4 100644 --- a/extension-impl/remoting-http/src/main/java/com/alipay/sofa/rpc/server/http/HttpServerHandler.java +++ b/extension-impl/remoting-http/src/main/java/com/alipay/sofa/rpc/server/http/HttpServerHandler.java @@ -61,10 +61,6 @@ public AtomicInteger getProcessingCount() { return processingCount; } - public HttpServerHandler(ThreadPoolExecutor bizThreadPool) { - this.bizThreadPool = bizThreadPool; - } - @Override public void registerChannel(AbstractChannel nettyChannel) { @@ -139,4 +135,9 @@ public boolean checkService(String serviceName, String methodName) { public ThreadPoolExecutor getBizThreadPool() { return bizThreadPool; } + + public HttpServerHandler setBizThreadPool(ThreadPoolExecutor bizThreadPool) { + this.bizThreadPool = bizThreadPool; + return this; + } } \ No newline at end of file diff --git a/extension-impl/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/http/AbstractHttp2ClientTransport.java b/extension-impl/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/http/AbstractHttp2ClientTransport.java index 65588ff0f..b0fc1d8f7 100644 --- a/extension-impl/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/http/AbstractHttp2ClientTransport.java +++ b/extension-impl/remoting-http/src/main/java/com/alipay/sofa/rpc/transport/http/AbstractHttp2ClientTransport.java @@ -104,10 +104,14 @@ public AbstractHttp2ClientTransport(ClientTransportConfig transportConfig) { * 服务端提供者信息 */ protected final ProviderInfo providerInfo; + /** + * Start from 3 (because 1 is setting stream) + */ + private final static int START_STREAM_ID = 3; /** * StreamId, start from 3 (because 1 is setting stream) */ - protected final AtomicInteger streamId = new AtomicInteger(3); + protected final AtomicInteger streamId = new AtomicInteger(); /** * 正在发送的调用数量 */ @@ -131,6 +135,9 @@ public AbstractHttp2ClientTransport(ClientTransportConfig transportConfig) { @Override public void connect() { + if (isAvailable()) { + return; + } EventLoopGroup workerGroup = NettyHelper.getClientIOEventLoopGroup(); Http2ClientInitializer initializer = new Http2ClientInitializer(transportConfig); try { @@ -152,6 +159,8 @@ public void connect() { http2SettingsHandler.awaitSettings(transportConfig.getConnectTimeout(), TimeUnit.MILLISECONDS); responseChannelHandler = initializer.responseHandler(); + // RESET streamId + streamId.set(START_STREAM_ID); } catch (Exception e) { throw new SofaRpcException(RpcErrorType.CLIENT_NETWORK, e); } @@ -295,8 +304,8 @@ protected void doSend(final SofaRequest request, AbstractHttpClientHandler callb TIMEOUT_TIMER.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - Map.Entry entry = - responseChannelHandler.removePromise(requestId); + Map.Entry entry = responseChannelHandler + .removePromise(requestId); if (entry != null) { ClientHandler handler = entry.getValue(); Exception e = timeoutException(request, timeoutMills, null); diff --git a/extension-impl/remoting-resteasy/src/main/java/com/alipay/sofa/rpc/server/rest/RestServer.java b/extension-impl/remoting-resteasy/src/main/java/com/alipay/sofa/rpc/server/rest/RestServer.java index 199f046de..f4c1444f4 100644 --- a/extension-impl/remoting-resteasy/src/main/java/com/alipay/sofa/rpc/server/rest/RestServer.java +++ b/extension-impl/remoting-resteasy/src/main/java/com/alipay/sofa/rpc/server/rest/RestServer.java @@ -69,6 +69,7 @@ public class RestServer implements Server { @Override public void init(ServerConfig serverConfig) { this.serverConfig = serverConfig; + httpServer = buildServer(); } private SofaNettyJaxrsServer buildServer() { @@ -136,7 +137,6 @@ public void start() { } // 绑定到端口 try { - httpServer = buildServer(); httpServer.start(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Start the http rest server at port {}", serverConfig.getPort()); @@ -170,7 +170,6 @@ public void stop() { LOGGER.info("Stop the http rest server at port {}", serverConfig.getPort()); } httpServer.stop(); - httpServer = null; } catch (Exception e) { LOGGER.error("Stop the http rest server at port " + serverConfig.getPort() + " error !", e); } @@ -222,6 +221,7 @@ public void unRegisterProcessor(ProviderConfig providerConfig, boolean closeIfNo @Override public void destroy() { stop(); + httpServer = null; } @Override diff --git a/extension-impl/remoting-resteasy/src/main/java/com/alipay/sofa/rpc/server/rest/SofaNettyJaxrsServer.java b/extension-impl/remoting-resteasy/src/main/java/com/alipay/sofa/rpc/server/rest/SofaNettyJaxrsServer.java index d934a3bbd..10d015f45 100644 --- a/extension-impl/remoting-resteasy/src/main/java/com/alipay/sofa/rpc/server/rest/SofaNettyJaxrsServer.java +++ b/extension-impl/remoting-resteasy/src/main/java/com/alipay/sofa/rpc/server/rest/SofaNettyJaxrsServer.java @@ -20,7 +20,6 @@ import com.alipay.sofa.rpc.common.struct.NamedThreadFactory; import com.alipay.sofa.rpc.common.utils.StringUtils; import com.alipay.sofa.rpc.config.ServerConfig; - import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; @@ -64,7 +63,7 @@ public class SofaNettyJaxrsServer implements EmbeddedJaxrsServer { private final ServerConfig serverConfig; - protected ServerBootstrap bootstrap = new ServerBootstrap(); + protected ServerBootstrap bootstrap = null; protected String hostname = null; protected int port = 8080; protected ResteasyDeployment deployment = new SofaResteasyDeployment(); // CHANGE: 使用sofa的类 @@ -224,7 +223,7 @@ public void start() { serverConfig.isDaemon())); } // Configure the server. - bootstrap + bootstrap = new ServerBootstrap() .group(eventLoopGroup) .channel( (serverConfig != null && serverConfig.isEpoll()) ? EpollServerSocketChannel.class @@ -293,5 +292,6 @@ public void stop() { eventExecutor.shutdownGracefully().sync(); } catch (Exception ignore) { // NOPMD } + bootstrap = null; } } diff --git a/test/test-common/src/main/java/com/alipay/sofa/rpc/test/TestUtils.java b/test/test-common/src/main/java/com/alipay/sofa/rpc/test/TestUtils.java new file mode 100644 index 000000000..37a6ccd14 --- /dev/null +++ b/test/test-common/src/main/java/com/alipay/sofa/rpc/test/TestUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test; + +import java.util.concurrent.Callable; + +/** + * @author GengZhang + */ +public class TestUtils { + + public static T delayGet(Callable callable, T expect, int period, int times) { + T result = null; + int i = 0; + while (i++ < times) { + try { + Thread.sleep(period); + result = callable.call(); + if (result != null && result.equals(expect)) { + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + return result; + } +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/client/bolt/BoltDirectUrlTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/client/bolt/BoltDirectUrlTest.java new file mode 100644 index 000000000..6eedc5919 --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/client/bolt/BoltDirectUrlTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.client.bolt; + +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.common.utils.CommonUtils; +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.core.exception.SofaRpcException; +import com.alipay.sofa.rpc.test.ActivelyDestroyTest; +import com.alipay.sofa.rpc.test.HelloService; +import com.alipay.sofa.rpc.test.HelloServiceImpl; +import com.alipay.sofa.rpc.test.TestUtils; +import org.junit.Assert; + +import java.util.concurrent.Callable; + +/** + * @author GengZhang + */ +public class BoltDirectUrlTest extends ActivelyDestroyTest { + + // @Test + // FIXME 目前bolt的IO线程关闭时未释放,暂不支持本测试用例 + public void testAll() { + // 只有2个线程 执行 + ServerConfig serverConfig = new ServerConfig() + .setPort(12300) + .setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT) + .setDaemon(true); + + // 发布一个服务,每个请求要执行1秒 + ProviderConfig providerConfig = new ProviderConfig() + .setInterfaceId(HelloService.class.getName()) + .setRef(new HelloServiceImpl()) + .setBootstrap("bolt") + .setApplication(new ApplicationConfig().setAppName("serverApp")) + .setServer(serverConfig) + .setRegister(false); + providerConfig.export(); + + final ConsumerConfig consumerConfig = new ConsumerConfig() + .setInterfaceId(HelloService.class.getName()) + .setDirectUrl("bolt://127.0.0.1:12300") + .setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT) + .setBootstrap("bolt") + .setApplication(new ApplicationConfig().setAppName("clientApp")) + .setReconnectPeriod(1000); + + HelloService helloService = consumerConfig.refer(); + + Assert.assertNotNull(helloService.sayHello("xx", 22)); + + serverConfig.getServer().stop(); + + // 关闭后再调用一个抛异常 + try { + helloService.sayHello("xx", 22); + } catch (Exception e) { + // 应该抛出异常 + Assert.assertTrue(e instanceof SofaRpcException); + } + + Assert.assertTrue(TestUtils.delayGet(new Callable() { + @Override + public Boolean call() throws Exception { + return CommonUtils.isEmpty(consumerConfig.getConsumerBootstrap() + .getCluster().getConnectionHolder().getAvailableConnections()); + } + }, true, 50, 40)); + + serverConfig.getServer().start(); + // 等待客户端重连服务端 + Assert.assertTrue(TestUtils.delayGet(new Callable() { + @Override + public Boolean call() throws Exception { + return CommonUtils.isNotEmpty(consumerConfig.getConsumerBootstrap() + .getCluster().getConnectionHolder().getAvailableConnections()); + } + }, true, 50, 60)); + + Assert.assertNotNull(helloService.sayHello("xx", 22)); + } +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/client/h2c/H2cDirectUrlTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/client/h2c/H2cDirectUrlTest.java new file mode 100644 index 000000000..f91b78cee --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/client/h2c/H2cDirectUrlTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.client.h2c; + +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.common.utils.CommonUtils; +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.core.exception.SofaRpcException; +import com.alipay.sofa.rpc.server.http.ExampleObj; +import com.alipay.sofa.rpc.server.http.HttpService; +import com.alipay.sofa.rpc.server.http.HttpServiceImpl; +import com.alipay.sofa.rpc.test.ActivelyDestroyTest; +import com.alipay.sofa.rpc.test.TestUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.Callable; + +/** + * @author GengZhang + */ +public class H2cDirectUrlTest extends ActivelyDestroyTest { + + @Test + public void testAll() throws InterruptedException { + + // 只有1个线程 执行 + ServerConfig serverConfig = new ServerConfig() + .setPort(12300) + .setProtocol(RpcConstants.PROTOCOL_TYPE_H2C) + .setDaemon(true); + + // 发布一个服务,每个请求要执行1秒 + ProviderConfig providerConfig = new ProviderConfig() + .setInterfaceId(HttpService.class.getName()) + .setRef(new HttpServiceImpl()) + .setBootstrap("h2c") + .setApplication(new ApplicationConfig().setAppName("serverApp")) + .setServer(serverConfig) + .setRegister(false); + providerConfig.export(); + + final ConsumerConfig consumerConfig = new ConsumerConfig() + .setInterfaceId(HttpService.class.getName()) + .setDirectUrl("h2c://127.0.0.1:12300") + .setProtocol(RpcConstants.PROTOCOL_TYPE_H2C) + .setBootstrap("h2c") + .setApplication(new ApplicationConfig().setAppName("clientApp")) + .setReconnectPeriod(1000); + + HttpService httpService = consumerConfig.refer(); + + ExampleObj request = new ExampleObj(); + request.setId(200); + request.setName("yyy"); + ExampleObj response = httpService.object(request); + Assert.assertEquals(200, response.getId()); + Assert.assertEquals("yyyxx", response.getName()); + + serverConfig.getServer().stop(); + + // 关闭后再调用一个抛异常 + try { + httpService.object(request); + } catch (Exception e) { + // 应该抛出异常 + Assert.assertTrue(e instanceof SofaRpcException); + } + + Assert.assertTrue(TestUtils.delayGet(new Callable() { + @Override + public Boolean call() throws Exception { + return CommonUtils.isEmpty(consumerConfig.getConsumerBootstrap() + .getCluster().getConnectionHolder().getAvailableConnections()); + } + }, true, 50, 40)); + + serverConfig.getServer().start(); + // 等待客户端重连服务端 + Assert.assertTrue(TestUtils.delayGet(new Callable() { + @Override + public Boolean call() throws Exception { + return CommonUtils.isNotEmpty(consumerConfig.getConsumerBootstrap() + .getCluster().getConnectionHolder().getAvailableConnections()); + } + }, true, 50, 60)); + + response = httpService.object(request); + Assert.assertEquals(200, response.getId()); + Assert.assertEquals("yyyxx", response.getName()); + } +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/client/rest/RestDirectUrlTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/client/rest/RestDirectUrlTest.java new file mode 100644 index 000000000..7ab0f2b0b --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/client/rest/RestDirectUrlTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.client.rest; + +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.common.utils.CommonUtils; +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.core.exception.SofaRpcException; +import com.alipay.sofa.rpc.server.rest.RestService; +import com.alipay.sofa.rpc.server.rest.RestServiceImpl; +import com.alipay.sofa.rpc.test.ActivelyDestroyTest; +import com.alipay.sofa.rpc.test.TestUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.Callable; + +/** + * @author GengZhang + */ +public class RestDirectUrlTest extends ActivelyDestroyTest { + + @Test + public void testAll() { + + // 只有1个线程 执行 + ServerConfig serverConfig = new ServerConfig() + .setPort(12300) + .setProtocol(RpcConstants.PROTOCOL_TYPE_REST) + .setDaemon(true); + + // 发布一个服务,每个请求要执行1秒 + ProviderConfig providerConfig = new ProviderConfig() + .setInterfaceId(RestService.class.getName()) + .setRef(new RestServiceImpl()) + .setBootstrap("rest") + .setApplication(new ApplicationConfig().setAppName("serverApp")) + .setServer(serverConfig) + .setRegister(false); + providerConfig.export(); + + final ConsumerConfig consumerConfig = new ConsumerConfig() + .setInterfaceId(RestService.class.getName()) + .setDirectUrl("rest://127.0.0.1:12300") + .setProtocol(RpcConstants.PROTOCOL_TYPE_REST) + .setBootstrap("rest") + .setApplication(new ApplicationConfig().setAppName("clientApp")) + .setReconnectPeriod(1000); + + RestService restService = consumerConfig.refer(); + + Assert.assertEquals(restService.query(11), "hello world !null"); + + serverConfig.getServer().stop(); + + // 关闭后再调用一个抛异常 + try { + restService.query(11); + } catch (Exception e) { + // 应该抛出异常 + Assert.assertTrue(e instanceof SofaRpcException); + } + + Assert.assertTrue(TestUtils.delayGet(new Callable() { + @Override + public Boolean call() throws Exception { + return CommonUtils.isEmpty(consumerConfig.getConsumerBootstrap() + .getCluster().getConnectionHolder().getAvailableConnections()); + } + }, true, 50, 40)); + + serverConfig.getServer().start(); + // 等待客户端重连服务端 + Assert.assertTrue(TestUtils.delayGet(new Callable() { + @Override + public Boolean call() throws Exception { + return CommonUtils.isNotEmpty(consumerConfig.getConsumerBootstrap() + .getCluster().getConnectionHolder().getAvailableConnections()); + } + }, true, 50, 60)); + + Assert.assertEquals(restService.query(11), "hello world !null"); + } +}