diff --git a/extension-impl/extension-common/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultProviderBootstrap.java b/extension-impl/extension-common/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultProviderBootstrap.java index a920cc830..ec5b5fd52 100644 --- a/extension-impl/extension-common/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultProviderBootstrap.java +++ b/extension-impl/extension-common/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultProviderBootstrap.java @@ -110,36 +110,50 @@ private void doExport() { if (exported) { return; } - String key = providerConfig.buildKey(); - String appName = providerConfig.getAppName(); + // 检查参数 checkParameters(); - if (LOGGER.isInfoEnabled(appName)) { - LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId()); - } - // 注意同一interface,同一uniqleId,不同server情况 - AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器 - if (cnt == null) { // 没有发布过 - cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0)); - } - int c = cnt.incrementAndGet(); - int maxProxyCount = providerConfig.getRepeatedExportLimit(); - if (maxProxyCount > 0) { - if (c > maxProxyCount) { - cnt.decrementAndGet(); - // 超过最大数量,直接抛出异常 - throw new SofaRpcRuntimeException("Duplicate provider config with key " + key - + " has been exported more than " + maxProxyCount + " times!" - + " Maybe it's wrong config, please check it." - + " Ignore this if you did that on purpose!"); - } else if (c > 1) { - if (LOGGER.isInfoEnabled(appName)) { - LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!" + String appName = providerConfig.getAppName(); + + //key is the protocol of server,for concurrent safe + Map hasExportedInCurrent = new ConcurrentHashMap(); + // 将处理器注册到server + List serverConfigs = providerConfig.getServer(); + for (ServerConfig serverConfig : serverConfigs) { + String protocol = serverConfig.getProtocol(); + + String key = providerConfig.buildKey() + ":" + protocol; + + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId()); + } + + // 注意同一interface,同一uniqleId,不同server情况 + AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器 + if (cnt == null) { // 没有发布过 + cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0)); + } + int c = cnt.incrementAndGet(); + hasExportedInCurrent.put(serverConfig.getProtocol(), true); + int maxProxyCount = providerConfig.getRepeatedExportLimit(); + if (maxProxyCount > 0) { + if (c > maxProxyCount) { + decrementCounter(hasExportedInCurrent); + // 超过最大数量,直接抛出异常 + throw new SofaRpcRuntimeException("Duplicate provider config with key " + key + + " has been exported more than " + maxProxyCount + " times!" + " Maybe it's wrong config, please check it." - + " Ignore this if you did that on purpose!", key); + + " Ignore this if you did that on purpose!"); + } else if (c > 1) { + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!" + + " Maybe it's wrong config, please check it." + + " Ignore this if you did that on purpose!", key); + } } } + } try { @@ -155,7 +169,6 @@ private void doExport() { } } // 将处理器注册到server - List serverConfigs = providerConfig.getServer(); for (ServerConfig serverConfig : serverConfigs) { try { Server server = serverConfig.buildIfAbsent(); @@ -164,6 +177,7 @@ private void doExport() { if (serverConfig.isAutoStart()) { server.start(); } + } catch (SofaRpcRuntimeException e) { throw e; } catch (Exception e) { @@ -176,7 +190,8 @@ private void doExport() { providerConfig.setConfigListener(new ProviderAttributeListener()); register(); } catch (Exception e) { - cnt.decrementAndGet(); + decrementCounter(hasExportedInCurrent); + if (e instanceof SofaRpcRuntimeException) { throw (SofaRpcRuntimeException) e; } else { @@ -189,6 +204,22 @@ private void doExport() { exported = true; } + /** + * decrease counter + * @param hasExportedInCurrent + */ + private void decrementCounter(Map hasExportedInCurrent) { + //once error, we decrementAndGet the counter + for (Map.Entry entry : hasExportedInCurrent.entrySet()) { + String protocol = entry.getKey(); + String key = providerConfig.buildKey() + ":" + protocol; + AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器 + if (cnt != null && cnt.get() > 0) { + cnt.decrementAndGet(); + } + } + } + /** * for check fields and parameters of consumer config */ @@ -246,12 +277,16 @@ public void unExport() { if (!exported) { return; } - - String key = providerConfig.buildKey(); String appName = providerConfig.getAppName(); - if (LOGGER.isInfoEnabled(appName)) { - LOGGER.infoWithApp(appName, "Unexport provider config : {} {}", key, providerConfig.getId() != null - ? "with bean id " + providerConfig.getId() : ""); + + List serverConfigs = providerConfig.getServer(); + for (ServerConfig serverConfig : serverConfigs) { + String protocol = serverConfig.getProtocol(); + String key = providerConfig.buildKey() + ":" + protocol; + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, "Unexport provider config : {} {}", key, providerConfig.getId() != null + ? "with bean id " + providerConfig.getId() : ""); + } } // 取消注册到注册中心 @@ -260,7 +295,6 @@ public void unExport() { providerProxyInvoker = null; // 取消将处理器注册到server - List serverConfigs = providerConfig.getServer(); if (serverConfigs != null) { for (ServerConfig serverConfig : serverConfigs) { Server server = serverConfig.getServer(); @@ -281,10 +315,15 @@ public void unExport() { providerConfig.setConfigListener(null); // 清除缓存状态 - AtomicInteger cnt = EXPORTED_KEYS.get(key); - if (cnt != null && cnt.decrementAndGet() <= 0) { - EXPORTED_KEYS.remove(key); + for (ServerConfig serverConfig : serverConfigs) { + String protocol = serverConfig.getProtocol(); + String key = providerConfig.buildKey() + ":" + protocol; + AtomicInteger cnt = EXPORTED_KEYS.get(key); + if (cnt != null && cnt.decrementAndGet() <= 0) { + EXPORTED_KEYS.remove(key); + } } + RpcRuntimeContext.invalidateProviderConfig(this); exported = false; } diff --git a/extension-impl/registry-zk/src/main/java/com/alipay/sofa/rpc/registry/zk/ZookeeperRegistryHelper.java b/extension-impl/registry-zk/src/main/java/com/alipay/sofa/rpc/registry/zk/ZookeeperRegistryHelper.java index 394b3a624..f6a8e3ccb 100644 --- a/extension-impl/registry-zk/src/main/java/com/alipay/sofa/rpc/registry/zk/ZookeeperRegistryHelper.java +++ b/extension-impl/registry-zk/src/main/java/com/alipay/sofa/rpc/registry/zk/ZookeeperRegistryHelper.java @@ -84,10 +84,6 @@ static List convertProviderToUrls(ProviderConfig providerConfig) { .append( getKeyPairs(RpcConstants.CONFIG_KEY_DYNAMIC, providerConfig.isDynamic())) .append(getKeyPairs(ProviderInfoAttrs.ATTR_WEIGHT, providerConfig.getWeight())) - .append(getKeyPairs(ProviderInfoAttrs.ATTR_WARMUP_TIME, - providerConfig.getParameter(ProviderInfoAttrs.ATTR_WARMUP_TIME))) - .append(getKeyPairs(ProviderInfoAttrs.ATTR_WARMUP_WEIGHT, - providerConfig.getParameter(ProviderInfoAttrs.ATTR_WARMUP_WEIGHT))) .append(getKeyPairs("accepts", server.getAccepts())) .append(getKeyPairs(ProviderInfoAttrs.ATTR_START_TIME, RpcRuntimeContext.now())) .append( diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/server/multi/MultiProtolServerTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/server/multi/MultiProtolServerTest.java new file mode 100644 index 000000000..1f71e60e3 --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/server/multi/MultiProtolServerTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.server.multi; + +import com.alipay.sofa.rpc.bootstrap.DefaultProviderBootstrap; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.context.RpcRuntimeContext; +import com.alipay.sofa.rpc.server.rest.RestService; +import com.alipay.sofa.rpc.server.rest.RestServiceImpl; +import com.alipay.sofa.rpc.test.ActivelyDestroyTest; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author GengZhang + */ +public class MultiProtolServerTest extends ActivelyDestroyTest { + + @Test + public void testMultiProtocol() { + + try { + // 只有2个线程 执行 + ServerConfig serverConfig = new ServerConfig() + .setStopTimeout(0) + .setPort(22222) + .setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT) + .setQueues(100).setCoreThreads(1).setMaxThreads(2); + + // 发布一个服务,每个请求要执行1秒 + ProviderConfig providerConfig = new ProviderConfig() + .setInterfaceId(RestService.class.getName()) + .setRef(new RestServiceImpl()) + .setServer(serverConfig) + .setRepeatedExportLimit(1) + .setRegister(false); + providerConfig.export(); + + ServerConfig serverConfig2 = new ServerConfig() + .setStopTimeout(0) + .setPort(22223) + .setProtocol(RpcConstants.PROTOCOL_TYPE_REST) + .setQueues(100).setCoreThreads(1).setMaxThreads(2); + + // 发布一个服务,每个请求要执行1秒 + ProviderConfig providerConfig2 = new ProviderConfig() + .setInterfaceId(RestService.class.getName()) + .setRef(new RestServiceImpl()) + .setServer(serverConfig2) + .setRepeatedExportLimit(1) + .setRegister(false); + providerConfig2.export(); + } catch (Throwable e) { + Assert.fail(); + } + + } + + @Test + public void testMultiProtocolExp() throws NoSuchFieldException { + try { + // 只有2个线程 执行 + ServerConfig serverConfig = new ServerConfig() + .setStopTimeout(0) + .setPort(22222) + .setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT) + .setQueues(100).setCoreThreads(1).setMaxThreads(2); + + // 发布一个服务,每个请求要执行1秒 + ProviderConfig providerConfig = new ProviderConfig() + .setInterfaceId(RestService.class.getName()) + .setRef(new RestServiceImpl()) + .setServer(serverConfig) + .setRepeatedExportLimit(1) + .setRegister(false); + providerConfig.export(); + + ServerConfig serverConfig2 = new ServerConfig() + .setStopTimeout(0) + .setPort(22223) + .setProtocol(RpcConstants.PROTOCOL_TYPE_REST) + .setQueues(100).setCoreThreads(1).setMaxThreads(2); + + // 发布一个服务,每个请求要执行1秒 + ProviderConfig providerConfig2 = new ProviderConfig() + .setInterfaceId(RestService.class.getName()) + .setRef(new RestServiceImpl()) + .setServer(serverConfig2) + .setRepeatedExportLimit(1) + .setRegister(false); + providerConfig2.export(); + + ProviderConfig providerConfig3 = new ProviderConfig() + .setInterfaceId(RestService.class.getName()) + .setRef(new RestServiceImpl()) + .setServer(serverConfig2) + .setRepeatedExportLimit(1) + .setRegister(false); + providerConfig3.export(); + + Assert.fail(); + } catch (Throwable e) { + //reflect to fetch export key + ConcurrentHashMap map = null; + + Field field = DefaultProviderBootstrap.class.getDeclaredField("EXPORTED_KEYS"); + try { + field.setAccessible(true); + map = (ConcurrentHashMap) field.get(null); + } catch (IllegalAccessException e1) { + e1.printStackTrace(); + } + + //two providers publish done, the third will false, and revert counter, export value is 1 + for (Map.Entry entry : map.entrySet()) { + AtomicInteger atomicInteger = entry.getValue(); + Assert.assertEquals(1, atomicInteger.get()); + } + } + + } +} \ No newline at end of file