From e5b3a76815820bd717c1bb147fe0f7af5e61f8fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=81=AF=E7=BE=BD?= Date: Mon, 7 Mar 2016 19:04:10 +0800 Subject: [PATCH] change grid address and codec factory --- pom.xml | 4 +- src/main/java/darks/grid/GridContext.java | 8 +- .../java/darks/grid/beans/GridAddress.java | 119 ++++++++++++++++++ src/main/java/darks/grid/beans/NodeId.java | 5 +- .../darks/grid/beans/meta/MasterMeta.java | 10 +- .../java/darks/grid/config/CodecConfig.java | 72 +++++++++++ .../darks/grid/config/GridConfigFactory.java | 52 +++++++- .../java/darks/grid/config/NetworkConfig.java | 8 +- .../grid/events/handler/JoinReplyHandler.java | 6 +- .../events/handler/JoinRequestHandler.java | 8 +- .../grid/events/handler/NodeLeaveHandler.java | 5 +- .../darks/grid/manager/GridNodesManager.java | 13 +- .../grid/manager/GridStorageManager.java | 14 +-- .../darks/grid/network/GridLocalSession.java | 9 +- .../darks/grid/network/GridMessageClient.java | 18 ++- .../darks/grid/network/GridMessageServer.java | 7 +- .../grid/network/GridNetworkManager.java | 27 ++-- .../darks/grid/network/GridRemoteSession.java | 9 +- .../java/darks/grid/network/GridSession.java | 6 +- .../grid/network/codec/CodecFactory.java | 109 ++++++++++++++++ .../darks/grid/network/codec/FSTCodec.java | 8 ++ .../grid/network/codec/GenericCodec.java | 7 ++ .../darks/grid/network/codec/GridCodec.java | 7 +- .../grid/network/codec/GridObjectDecoder.java | 14 +-- .../grid/network/codec/GridObjectEncoder.java | 5 + .../grid/network/codec/HessianCodec.java | 7 ++ .../darks/grid/network/codec/KryoCodec.java | 57 ++++++--- .../grid/network/discovery/MERGE_NODES.java | 4 +- .../darks/grid/network/discovery/TCPPING.java | 14 +-- .../darks/grid/network/handler/msg/JOIN.java | 8 +- .../grid/network/handler/msg/JOIN_REPLY.java | 6 +- .../java/darks/grid/utils/ParamsUtils.java | 8 +- .../java/darks/grid/test/codec/CodecTest.java | 4 +- src/test/java/grid-config.xml | 5 +- 34 files changed, 540 insertions(+), 123 deletions(-) create mode 100644 src/main/java/darks/grid/beans/GridAddress.java create mode 100644 src/main/java/darks/grid/config/CodecConfig.java create mode 100644 src/main/java/darks/grid/network/codec/CodecFactory.java diff --git a/pom.xml b/pom.xml index 7dbe911..49052ac 100644 --- a/pom.xml +++ b/pom.xml @@ -108,9 +108,9 @@ true - com.esotericsoftware + com.esotericsoftware.kryo kryo - 3.0.3 + 2.21 true diff --git a/src/main/java/darks/grid/GridContext.java b/src/main/java/darks/grid/GridContext.java index ecdbd32..225a667 100644 --- a/src/main/java/darks/grid/GridContext.java +++ b/src/main/java/darks/grid/GridContext.java @@ -17,8 +17,8 @@ package darks.grid; import java.io.Serializable; -import java.net.InetSocketAddress; +import darks.grid.beans.GridAddress; import darks.grid.beans.MachineInfo; import darks.grid.commons.MachineInfoFactory; import darks.grid.config.GridConfiguration; @@ -37,7 +37,7 @@ public class GridContext implements Serializable, GridManager private String clusterName; - private InetSocketAddress serverAddress; + private GridAddress serverAddress; private transient MachineInfoFactory machineInfoFactory; @@ -112,12 +112,12 @@ public void setClusterName(String clusterName) this.clusterName = clusterName; } - public synchronized InetSocketAddress getServerAddress() + public synchronized GridAddress getServerAddress() { return serverAddress; } - public synchronized void setServerAddress(InetSocketAddress serverAddress) + public synchronized void setServerAddress(GridAddress serverAddress) { this.serverAddress = serverAddress; } diff --git a/src/main/java/darks/grid/beans/GridAddress.java b/src/main/java/darks/grid/beans/GridAddress.java new file mode 100644 index 0000000..5b0f1cb --- /dev/null +++ b/src/main/java/darks/grid/beans/GridAddress.java @@ -0,0 +1,119 @@ +/** + * + * Copyright 2015 The Darks Grid Project (Liu lihua) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package darks.grid.beans; + +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import darks.grid.utils.StringUtils; + +public class GridAddress implements Serializable +{ + + /** + * + */ + private static final long serialVersionUID = -715203941820641100L; + + private String hostName; + + private int port; + + public GridAddress() + { + + } + + public GridAddress(InetSocketAddress addr) + { + this(addr.getHostName(), addr.getPort()); + } + + public GridAddress(String hostName, int port) + { + this.hostName = hostName; + this.port = port; + } + + public InetSocketAddress getSocketAddress() + { + return new InetSocketAddress(hostName, port); + } + + public static GridAddress wrap(InetSocketAddress addr) + { + return new GridAddress(addr); + } + + public static GridAddress wrap(SocketAddress addr) + { + return wrap((InetSocketAddress) addr); + } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + @Override + public String toString() { + return StringUtils.stringBuffer(hostName, ':', port); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((hostName == null) ? 0 : hostName.hashCode()); + result = prime * result + port; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + GridAddress other = (GridAddress) obj; + if (hostName == null) { + if (other.hostName != null) + return false; + } else if (!hostName.equals(other.hostName)) + return false; + if (port != other.port) + return false; + return true; + } + + +} diff --git a/src/main/java/darks/grid/beans/NodeId.java b/src/main/java/darks/grid/beans/NodeId.java index e1518ac..b58b6ba 100644 --- a/src/main/java/darks/grid/beans/NodeId.java +++ b/src/main/java/darks/grid/beans/NodeId.java @@ -18,10 +18,9 @@ import java.io.ByteArrayOutputStream; import java.lang.management.ManagementFactory; -import java.net.InetSocketAddress; -import darks.grid.GridRuntime; import darks.grid.GridException; +import darks.grid.GridRuntime; import darks.grid.utils.BytesUtils; import darks.grid.utils.NetworkUtils; @@ -41,7 +40,7 @@ public static String localId() { byte[] macBytes = NetworkUtils.getMacBytes(); String proccessId = ManagementFactory.getRuntimeMXBean().getName(); - InetSocketAddress ipAddress = GridRuntime.network().getBindAddress(); + GridAddress ipAddress = GridRuntime.network().getBindAddress(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); if (macBytes != null) baos.write(macBytes); diff --git a/src/main/java/darks/grid/beans/meta/MasterMeta.java b/src/main/java/darks/grid/beans/meta/MasterMeta.java index 8b777a2..3b35220 100644 --- a/src/main/java/darks/grid/beans/meta/MasterMeta.java +++ b/src/main/java/darks/grid/beans/meta/MasterMeta.java @@ -17,7 +17,7 @@ package darks.grid.beans.meta; -import java.net.InetSocketAddress; +import darks.grid.beans.GridAddress; public class MasterMeta extends BaseMeta { @@ -29,14 +29,14 @@ public class MasterMeta extends BaseMeta private String nodeId; - private InetSocketAddress address; + private GridAddress address; public MasterMeta() { } - public MasterMeta(String nodeId, InetSocketAddress address) + public MasterMeta(String nodeId, GridAddress address) { super(); this.nodeId = nodeId; @@ -53,12 +53,12 @@ public void setNodeId(String nodeId) this.nodeId = nodeId; } - public InetSocketAddress getAddress() + public GridAddress getAddress() { return address; } - public void setAddress(InetSocketAddress address) + public void setAddress(GridAddress address) { this.address = address; } diff --git a/src/main/java/darks/grid/config/CodecConfig.java b/src/main/java/darks/grid/config/CodecConfig.java new file mode 100644 index 0000000..bad10bd --- /dev/null +++ b/src/main/java/darks/grid/config/CodecConfig.java @@ -0,0 +1,72 @@ +/** + * + * Copyright 2015 The Darks Grid Project (Liu lihua) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package darks.grid.config; + +import java.util.HashMap; +import java.util.Map; + +import darks.grid.network.codec.GridCodec; + +public class CodecConfig +{ + + + + private String type; + + private Class codecClass = null; + + private Map parameters = new HashMap(); + + public CodecConfig() + { + + } + + public void addParameter(String key, String value) + { + parameters.put(key, value); + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public Class getCodecClass() { + return codecClass; + } + + public void setCodecClass(Class codecClass) { + this.codecClass = codecClass; + } + + public Map getParameters() { + return parameters; + } + + public void setParameters(Map parameters) { + this.parameters = parameters; + } + + + +} diff --git a/src/main/java/darks/grid/config/GridConfigFactory.java b/src/main/java/darks/grid/config/GridConfigFactory.java index 71b177a..5603e2f 100644 --- a/src/main/java/darks/grid/config/GridConfigFactory.java +++ b/src/main/java/darks/grid/config/GridConfigFactory.java @@ -34,6 +34,7 @@ import darks.grid.GridException; import darks.grid.config.EventsConfig.EventsChannelConfig; import darks.grid.config.MasterConfig.MasterTaskConfig; +import darks.grid.network.codec.GridCodec; import darks.grid.utils.IOUtils; import darks.grid.utils.ReflectUtils; @@ -97,7 +98,7 @@ private static GridConfiguration parseDocument(Document doc) Element el = (Element) node; if ("network".equalsIgnoreCase(el.getNodeName())) { - parseAttrForObject(el, config.getNetworkConfig()); + parseNetwork(config, el); } else if ("task".equalsIgnoreCase(el.getNodeName())) { @@ -136,6 +137,55 @@ else if ("constant".equalsIgnoreCase(el.getNodeName())) return config; } + private static void parseNetwork(GridConfiguration config, Element el) + { + parseAttrForObject(el, config.getNetworkConfig()); + NodeList nodesList = el.getChildNodes(); + for (int i = 0; i < nodesList.getLength(); i++) + { + Node node = nodesList.item(i); + if (node instanceof Element) + { + Element elChild = (Element) node; + if ("codec".equalsIgnoreCase(elChild.getNodeName())) + { + parseNetworkCodec(config.getNetworkConfig().getCodecConfig(), elChild); + } + } + } + } + + @SuppressWarnings("unchecked") + private static void parseNetworkCodec(CodecConfig config, Element el) + { + try + { + NamedNodeMap nameNodes = el.getAttributes(); + for (int i = 0; i < nameNodes.getLength(); i++) + { + Node node = nameNodes.item(i); + String attrName = node.getNodeName().trim(); + String attrValue = node.getNodeValue().trim(); + if ("type".equalsIgnoreCase(attrName)) + { + config.setType(attrValue); + } + else if ("class".equalsIgnoreCase(attrName)) + { + config.setCodecClass((Class) Class.forName(attrValue)); + } + else + { + config.addParameter(attrName, attrValue); + } + } + } + catch (Exception e) + { + throw new GridException("Fail to parse codec. Cause " + e.getMessage(), e); + } + } + private static void parseEvents(GridConfiguration config, Element el) { parseAttrForObject(el, config.getEventsConfig()); diff --git a/src/main/java/darks/grid/config/NetworkConfig.java b/src/main/java/darks/grid/config/NetworkConfig.java index fa08cda..b8c1f47 100644 --- a/src/main/java/darks/grid/config/NetworkConfig.java +++ b/src/main/java/darks/grid/config/NetworkConfig.java @@ -70,6 +70,8 @@ public class NetworkConfig private boolean cacheHistoryNodes = true; + private CodecConfig codecConfig = new CodecConfig(); + public NetworkConfig() { @@ -300,8 +302,12 @@ public void setConnectFailRetry(int connectFailRetry) { this.connectFailRetry = connectFailRetry; } + + public CodecConfig getCodecConfig() { + return codecConfig; + } - @Override + @Override public String toString() { return "NetworkConfig [bindHost=" + bindHost + ", bindPort=" + bindPort diff --git a/src/main/java/darks/grid/events/handler/JoinReplyHandler.java b/src/main/java/darks/grid/events/handler/JoinReplyHandler.java index 85b55c2..0fcda67 100644 --- a/src/main/java/darks/grid/events/handler/JoinReplyHandler.java +++ b/src/main/java/darks/grid/events/handler/JoinReplyHandler.java @@ -16,7 +16,6 @@ */ package darks.grid.events.handler; -import java.net.SocketAddress; import java.util.Map; import java.util.Map.Entry; @@ -24,6 +23,7 @@ import org.slf4j.LoggerFactory; import darks.grid.GridRuntime; +import darks.grid.beans.GridAddress; import darks.grid.beans.GridEvent; import darks.grid.beans.meta.JoinMeta; import darks.grid.beans.meta.JoinNodeMeta; @@ -43,8 +43,8 @@ public void handle(GridEvent event) throws Exception String nodeId = meta.getNodeId(); synchronized (nodeId.intern()) { - Map nodesMap = GridRuntime.network().getWaitJoin(nodeId); - for (Entry entry : nodesMap.entrySet()) + Map nodesMap = GridRuntime.network().getWaitJoin(nodeId); + for (Entry entry : nodesMap.entrySet()) { try { diff --git a/src/main/java/darks/grid/events/handler/JoinRequestHandler.java b/src/main/java/darks/grid/events/handler/JoinRequestHandler.java index 9d352b5..722935d 100644 --- a/src/main/java/darks/grid/events/handler/JoinRequestHandler.java +++ b/src/main/java/darks/grid/events/handler/JoinRequestHandler.java @@ -16,7 +16,6 @@ */ package darks.grid.events.handler; -import java.net.SocketAddress; import java.util.Map; import java.util.Map.Entry; @@ -24,6 +23,7 @@ import org.slf4j.LoggerFactory; import darks.grid.GridRuntime; +import darks.grid.beans.GridAddress; import darks.grid.beans.GridEvent; import darks.grid.beans.GridNode; import darks.grid.beans.meta.JoinMeta; @@ -101,10 +101,10 @@ else if (count == 1) private void handleRepeatChannel(GridSession session, JoinMeta meta) { String nodeId = meta.getNodeId(); - Map nodesMap = GridRuntime.network().getWaitJoin(nodeId); + Map nodesMap = GridRuntime.network().getWaitJoin(nodeId); long keepJoinTime = 0; JoinMeta keepJoinMeta = null; - for (Entry entry : nodesMap.entrySet()) + for (Entry entry : nodesMap.entrySet()) { JoinMeta joinMeta = entry.getValue(); if (keepJoinMeta == null || joinMeta.getJoinTime() < keepJoinTime) @@ -136,7 +136,7 @@ private void handleNewChannel(GridSession session, JoinMeta meta, boolean autoJo { if (success) GridRuntime.nodes().addRemoteNode(nodeId, session, meta.context()); - Map nodesMap = GridRuntime.network().getWaitJoin(nodeId); + Map nodesMap = GridRuntime.network().getWaitJoin(nodeId); nodesMap.clear(); } } diff --git a/src/main/java/darks/grid/events/handler/NodeLeaveHandler.java b/src/main/java/darks/grid/events/handler/NodeLeaveHandler.java index d9f2127..9b28a17 100644 --- a/src/main/java/darks/grid/events/handler/NodeLeaveHandler.java +++ b/src/main/java/darks/grid/events/handler/NodeLeaveHandler.java @@ -16,12 +16,11 @@ */ package darks.grid.events.handler; -import java.net.InetSocketAddress; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import darks.grid.GridRuntime; +import darks.grid.beans.GridAddress; import darks.grid.beans.GridEvent; import darks.grid.beans.GridNode; import darks.grid.events.GridEventHandler; @@ -54,7 +53,7 @@ public void handle(GridEvent event) throws Exception private boolean retryConnect(GridNode node) { - InetSocketAddress address = node.context().getServerAddress(); + GridAddress address = node.context().getServerAddress(); int retryCount = GridRuntime.config().getNetworkConfig().getConnectFailRetry(); for (int i = 0; i < retryCount; i++) { diff --git a/src/main/java/darks/grid/manager/GridNodesManager.java b/src/main/java/darks/grid/manager/GridNodesManager.java index 0f898cd..68fc535 100644 --- a/src/main/java/darks/grid/manager/GridNodesManager.java +++ b/src/main/java/darks/grid/manager/GridNodesManager.java @@ -16,8 +16,6 @@ */ package darks.grid.manager; -import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -33,6 +31,7 @@ import darks.grid.GridContext; import darks.grid.GridRuntime; +import darks.grid.beans.GridAddress; import darks.grid.beans.GridEvent; import darks.grid.beans.GridNode; import darks.grid.beans.GridNode.GridNodeType; @@ -52,7 +51,7 @@ public class GridNodesManager implements GridManager private ConcurrentSkipListSet nodesSet = new ConcurrentSkipListSet(new NodeComparator()); - private Map addressMap = new ConcurrentHashMap(); + private Map addressMap = new ConcurrentHashMap(); private Map sessionIdMap = new ConcurrentHashMap(); @@ -146,7 +145,7 @@ public String getNodesInfo() return buf.toString(); } - public boolean contains(SocketAddress address) + public boolean contains(GridAddress address) { return addressMap.containsKey(address); } @@ -212,7 +211,7 @@ public synchronized GridNode removeNode(GridSession session) return node; } - public GridNode getNode(InetSocketAddress address) + public GridNode getNode(GridAddress address) { String nodeId = addressMap.get(address); if (nodeId != null) @@ -220,7 +219,7 @@ public GridNode getNode(InetSocketAddress address) return null; } - public String getNodeId(InetSocketAddress address) + public String getNodeId(GridAddress address) { return addressMap.get(address); } @@ -255,7 +254,7 @@ public Map getNodesMap() return nodesMap; } - public Map getAddressMap() + public Map getAddressMap() { return addressMap; } diff --git a/src/main/java/darks/grid/manager/GridStorageManager.java b/src/main/java/darks/grid/manager/GridStorageManager.java index aa2cdbd..1ac71b8 100644 --- a/src/main/java/darks/grid/manager/GridStorageManager.java +++ b/src/main/java/darks/grid/manager/GridStorageManager.java @@ -18,13 +18,13 @@ package darks.grid.manager; import java.io.File; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import darks.grid.beans.GridAddress; import darks.grid.config.GridConfiguration; import darks.grid.utils.FileUtils; import darks.grid.utils.StringUtils; @@ -57,30 +57,30 @@ public void destroy() } - public synchronized void cacheHistoryNodes(InetSocketAddress address) + public synchronized void cacheHistoryNodes(GridAddress address) { boolean cached = config.getNetworkConfig().isCacheHistoryNodes(); if (!cached) return; - String addr = StringUtils.stringBuffer(address.getAddress().getHostAddress(), ':', address.getPort()); + String addr = StringUtils.stringBuffer(address.getHostName(), ':', address.getPort()); FileUtils.appendLine(historyNodesFile, addr); } - public synchronized Collection getCacheHistoryNodes() + public synchronized Collection getCacheHistoryNodes() { if (historyNodesFile.exists()) { List addresses = FileUtils.readLineToList(historyNodesFile); - Set result = new HashSet(addresses.size()); + Set result = new HashSet(addresses.size()); for (String addr : addresses) { String[] datas = addr.split(":"); - result.add(new InetSocketAddress(datas[0], Integer.parseInt(datas[1]))); + result.add(new GridAddress(datas[0], Integer.parseInt(datas[1]))); } return result; } else - return new ArrayList(0); + return new ArrayList(0); } } diff --git a/src/main/java/darks/grid/network/GridLocalSession.java b/src/main/java/darks/grid/network/GridLocalSession.java index 43e71c4..83bd70e 100644 --- a/src/main/java/darks/grid/network/GridLocalSession.java +++ b/src/main/java/darks/grid/network/GridLocalSession.java @@ -21,6 +21,7 @@ import java.net.InetSocketAddress; import darks.grid.GridRuntime; +import darks.grid.beans.GridAddress; import darks.grid.utils.ChannelUtils; import darks.grid.utils.NetworkUtils; @@ -31,7 +32,7 @@ public class GridLocalSession implements GridSession int failRetryCount; - InetSocketAddress bindAddress; + GridAddress bindAddress; public GridLocalSession(Channel channel) { @@ -92,13 +93,13 @@ public boolean isActive() } @Override - public InetSocketAddress remoteAddress() + public GridAddress remoteAddress() { return localAddress(); } @Override - public synchronized InetSocketAddress localAddress() + public synchronized GridAddress localAddress() { if (channel == null) return null; @@ -106,7 +107,7 @@ public synchronized InetSocketAddress localAddress() { String ipHost = NetworkUtils.getIpAddress(); InetSocketAddress ipAddr = (InetSocketAddress) channel.localAddress(); - bindAddress = new InetSocketAddress(ipHost, ipAddr.getPort()); + bindAddress = new GridAddress(ipHost, ipAddr.getPort()); } return bindAddress; } diff --git a/src/main/java/darks/grid/network/GridMessageClient.java b/src/main/java/darks/grid/network/GridMessageClient.java index fad22b0..db09df5 100644 --- a/src/main/java/darks/grid/network/GridMessageClient.java +++ b/src/main/java/darks/grid/network/GridMessageClient.java @@ -28,8 +28,6 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; -import io.netty.handler.codec.serialization.ObjectDecoder; -import io.netty.handler.codec.serialization.ObjectEncoder; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -41,7 +39,9 @@ import org.slf4j.LoggerFactory; import darks.grid.GridRuntime; +import darks.grid.beans.GridAddress; import darks.grid.config.NetworkConfig; +import darks.grid.network.codec.CodecFactory; import darks.grid.network.handler.GridClientMessageHandler; import darks.grid.utils.ThreadUtils; @@ -102,6 +102,16 @@ public Channel connect(String host, int port) return connect(new InetSocketAddress(host, port)); } + public Channel connect(GridAddress address) + { + return connect(address.getSocketAddress()); + } + + public Channel connect(GridAddress address, boolean sync) + { + return connect(address.getSocketAddress(), sync); + } + public Channel connect(SocketAddress address) { return connect(address, true); @@ -155,8 +165,8 @@ private ChannelInitializer newChannelHandler() protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); - pipeline.addLast("encoder", new ObjectEncoder()); + pipeline.addLast("decoder", CodecFactory.createDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); + pipeline.addLast("encoder", CodecFactory.createEncoder()); // pipeline.addLast("alive", new IdleStateHandler(60, 60, 120)); pipeline.addLast("message", new GridClientMessageHandler()); } diff --git a/src/main/java/darks/grid/network/GridMessageServer.java b/src/main/java/darks/grid/network/GridMessageServer.java index 029deea..f92a189 100644 --- a/src/main/java/darks/grid/network/GridMessageServer.java +++ b/src/main/java/darks/grid/network/GridMessageServer.java @@ -28,8 +28,6 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; -import io.netty.handler.codec.serialization.ObjectDecoder; -import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.timeout.IdleStateHandler; import java.net.BindException; @@ -41,6 +39,7 @@ import darks.grid.GridRuntime; import darks.grid.config.NetworkConfig; +import darks.grid.network.codec.CodecFactory; import darks.grid.network.handler.GridServerMessageHandler; public class GridMessageServer extends GridMessageDispatcher @@ -153,8 +152,8 @@ private ChannelInitializer newChannelHandler() protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); - pipeline.addLast("encoder", new ObjectEncoder()); + pipeline.addLast("decoder", CodecFactory.createDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); + pipeline.addLast("encoder", CodecFactory.createEncoder()); pipeline.addLast("alive", new IdleStateHandler(60, 60, 120)); pipeline.addLast("message", new GridServerMessageHandler()); } diff --git a/src/main/java/darks/grid/network/GridNetworkManager.java b/src/main/java/darks/grid/network/GridNetworkManager.java index e393d9e..9b2059e 100644 --- a/src/main/java/darks/grid/network/GridNetworkManager.java +++ b/src/main/java/darks/grid/network/GridNetworkManager.java @@ -17,7 +17,6 @@ package darks.grid.network; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -27,6 +26,7 @@ import org.slf4j.LoggerFactory; import darks.grid.GridRuntime; +import darks.grid.beans.GridAddress; import darks.grid.beans.GridNode; import darks.grid.beans.TimerObject; import darks.grid.beans.meta.JoinMeta; @@ -47,9 +47,9 @@ public class GridNetworkManager implements GridManager private GridMessageClient messageClient; - private Map> waitActive = new ConcurrentHashMap>(); + private Map> waitActive = new ConcurrentHashMap>(); - private Map> waitJoin = new ConcurrentHashMap>(); + private Map> waitJoin = new ConcurrentHashMap>(); private Object mutex = new Object(); @@ -118,13 +118,18 @@ public void sendMessageToOthers(Object obj) node.sendSyncMessage(obj); } } + + public boolean tryJoinAddress(GridAddress address) + { + return tryJoinAddress(address, true); + } public boolean tryJoinAddress(InetSocketAddress address) { - return tryJoinAddress(address, true); + return tryJoinAddress(new GridAddress(address), true); } - public boolean tryJoinAddress(InetSocketAddress address, boolean sync) + public boolean tryJoinAddress(GridAddress address, boolean sync) { if (address == null) { @@ -162,10 +167,10 @@ public int addWaitJoin(String nodeId, JoinMeta meta) { synchronized (mutex) { - Map channelMap = waitJoin.get(nodeId); + Map channelMap = waitJoin.get(nodeId); if (channelMap == null) { - channelMap = new ConcurrentHashMap(); + channelMap = new ConcurrentHashMap(); waitJoin.put(nodeId, channelMap); } meta.setJoinTime(System.currentTimeMillis()); @@ -184,21 +189,21 @@ public void removeWaitActive(GridSession session) waitActive.remove(session.remoteAddress()); } - public synchronized Map getWaitJoin(String nodeId) + public synchronized Map getWaitJoin(String nodeId) { synchronized (mutex) { - Map channelMap = waitJoin.get(nodeId); + Map channelMap = waitJoin.get(nodeId); if (channelMap == null) { - channelMap = new ConcurrentHashMap(); + channelMap = new ConcurrentHashMap(); waitJoin.put(nodeId, channelMap); } return channelMap; } } - public synchronized InetSocketAddress getBindAddress() + public synchronized GridAddress getBindAddress() { if (messageServer == null || serverSession == null) return null; diff --git a/src/main/java/darks/grid/network/GridRemoteSession.java b/src/main/java/darks/grid/network/GridRemoteSession.java index b6b61cd..fa2e244 100644 --- a/src/main/java/darks/grid/network/GridRemoteSession.java +++ b/src/main/java/darks/grid/network/GridRemoteSession.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import darks.grid.GridRuntime; +import darks.grid.beans.GridAddress; import darks.grid.utils.ChannelUtils; public class GridRemoteSession implements GridSession @@ -132,19 +133,19 @@ public boolean isActive() } @Override - public InetSocketAddress remoteAddress() + public GridAddress remoteAddress() { if (channel == null) return null; - return (InetSocketAddress)channel.remoteAddress(); + return GridAddress.wrap((InetSocketAddress)channel.remoteAddress()); } @Override - public InetSocketAddress localAddress() + public GridAddress localAddress() { if (channel == null) return null; - return (InetSocketAddress)channel.localAddress(); + return GridAddress.wrap((InetSocketAddress)channel.localAddress()); } @Override diff --git a/src/main/java/darks/grid/network/GridSession.java b/src/main/java/darks/grid/network/GridSession.java index e1396e7..ccf5918 100644 --- a/src/main/java/darks/grid/network/GridSession.java +++ b/src/main/java/darks/grid/network/GridSession.java @@ -16,7 +16,7 @@ */ package darks.grid.network; -import java.net.InetSocketAddress; +import darks.grid.beans.GridAddress; public interface GridSession { @@ -33,9 +33,9 @@ public interface GridSession public boolean isActive(); - public InetSocketAddress remoteAddress(); + public GridAddress remoteAddress(); - public InetSocketAddress localAddress(); + public GridAddress localAddress(); public boolean isLocal(); } diff --git a/src/main/java/darks/grid/network/codec/CodecFactory.java b/src/main/java/darks/grid/network/codec/CodecFactory.java new file mode 100644 index 0000000..7242e40 --- /dev/null +++ b/src/main/java/darks/grid/network/codec/CodecFactory.java @@ -0,0 +1,109 @@ +/** + * + * Copyright 2015 The Darks Grid Project (Liu lihua) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package darks.grid.network.codec; + +import io.netty.channel.ChannelHandler; +import io.netty.handler.codec.serialization.ClassResolver; +import io.netty.handler.codec.serialization.ObjectDecoder; +import io.netty.handler.codec.serialization.ObjectEncoder; + +import java.util.HashMap; +import java.util.Map; + +import darks.grid.GridException; +import darks.grid.GridRuntime; +import darks.grid.config.CodecConfig; +import darks.grid.utils.ReflectUtils; + +public final class CodecFactory +{ + + public static final String NETTY_CODEC = "netty"; + public static final String GENERIC_CODEC = "generic"; + public static final String FST_CODEC = "fst"; + public static final String HESSIAN_CODEC = "hessian"; + public static final String KRYO_CODEC = "kryo"; + + public static final String DEFAULT_CODEC = NETTY_CODEC; + + private static Map> codecMap = + new HashMap>(); + + static + { + register(GENERIC_CODEC, GenericCodec.class); + register(KRYO_CODEC, KryoCodec.class); + register(FST_CODEC, FSTCodec.class); + register(HESSIAN_CODEC, HessianCodec.class); + } + + public static void register(String type, Class clazz) + { + codecMap.put(type, clazz); + } + + public static ChannelHandler createEncoder() + { + CodecConfig codecConfig = GridRuntime.config().getNetworkConfig().getCodecConfig(); + String type = DEFAULT_CODEC; + Class codecClass = null; + if (codecConfig.getType() != null) + { + type = codecConfig.getType().toLowerCase(); + codecClass = codecConfig.getCodecClass(); + } + if (NETTY_CODEC.equals(type)) + { + return new ObjectEncoder(); + } + if (codecMap.containsKey(type) && codecClass == null) + { + codecClass = codecMap.get(type); + } + if (codecClass == null) + throw new GridException("Cannot find codec " + type); + GridCodec codec = ReflectUtils.newInstance(codecClass); + codec.initialize(codecConfig.getParameters()); + return new GridObjectEncoder(codec); + } + + public static ChannelHandler createDecoder(ClassResolver classResolver) + { + CodecConfig codecConfig = GridRuntime.config().getNetworkConfig().getCodecConfig(); + String type = DEFAULT_CODEC; + Class codecClass = null; + if (codecConfig.getType() != null) + { + type = codecConfig.getType().toLowerCase(); + codecClass = codecConfig.getCodecClass(); + } + if (NETTY_CODEC.equals(type)) + { + return new ObjectDecoder(Integer.MAX_VALUE, classResolver); + } + if (codecMap.containsKey(type) && codecClass == null) + { + codecClass = codecMap.get(type); + } + if (codecClass == null) + throw new GridException("Cannot find codec " + type); + GridCodec codec = ReflectUtils.newInstance(codecClass); + codec.initialize(codecConfig.getParameters()); + return new GridObjectDecoder(classResolver, codec); + } +} diff --git a/src/main/java/darks/grid/network/codec/FSTCodec.java b/src/main/java/darks/grid/network/codec/FSTCodec.java index 5f4b575..3f7c7d4 100644 --- a/src/main/java/darks/grid/network/codec/FSTCodec.java +++ b/src/main/java/darks/grid/network/codec/FSTCodec.java @@ -3,6 +3,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.util.Map; import de.ruedigermoeller.serialization.FSTConfiguration; import de.ruedigermoeller.serialization.FSTObjectInput; @@ -14,7 +15,14 @@ public class FSTCodec implements GridCodec static FSTConfiguration conf = FSTConfiguration.createDefaultConfiguration(); + @Override + public void initialize(Map params) { + // TODO Auto-generated method stub + + } + + @Override public void encode(OutputStream out, Serializable msg) throws Exception { FSTObjectOutput fout = conf.getObjectOutput(out); diff --git a/src/main/java/darks/grid/network/codec/GenericCodec.java b/src/main/java/darks/grid/network/codec/GenericCodec.java index 6600f15..9ee4a6c 100644 --- a/src/main/java/darks/grid/network/codec/GenericCodec.java +++ b/src/main/java/darks/grid/network/codec/GenericCodec.java @@ -5,6 +5,7 @@ import java.io.ObjectOutputStream; import java.io.OutputStream; import java.io.Serializable; +import java.util.Map; import de.ruedigermoeller.serialization.FSTConfiguration; import io.netty.handler.codec.serialization.ClassResolver; @@ -15,6 +16,12 @@ public class GenericCodec implements GridCodec static FSTConfiguration conf = FSTConfiguration.createDefaultConfiguration(); @Override + public void initialize(Map params) { + // TODO Auto-generated method stub + + } + + @Override public void encode(OutputStream out, Serializable msg) throws Exception { ObjectOutputStream oos = new ObjectOutputStream(out); diff --git a/src/main/java/darks/grid/network/codec/GridCodec.java b/src/main/java/darks/grid/network/codec/GridCodec.java index a2af20b..1a59366 100644 --- a/src/main/java/darks/grid/network/codec/GridCodec.java +++ b/src/main/java/darks/grid/network/codec/GridCodec.java @@ -1,13 +1,16 @@ package darks.grid.network.codec; +import io.netty.handler.codec.serialization.ClassResolver; + import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; - -import io.netty.handler.codec.serialization.ClassResolver; +import java.util.Map; public interface GridCodec { + + public void initialize(Map params); public void encode(OutputStream out, Serializable msg) throws Exception; diff --git a/src/main/java/darks/grid/network/codec/GridObjectDecoder.java b/src/main/java/darks/grid/network/codec/GridObjectDecoder.java index da645d6..841a377 100644 --- a/src/main/java/darks/grid/network/codec/GridObjectDecoder.java +++ b/src/main/java/darks/grid/network/codec/GridObjectDecoder.java @@ -14,17 +14,11 @@ public class GridObjectDecoder extends LengthFieldBasedFrameDecoder private final ClassResolver classResolver; private GridCodec codec; - - /** - * Creates a new decoder whose maximum object size is {@code 1048576} - * bytes. If the size of the received object is greater than - * {@code 1048576} bytes, a {@link StreamCorruptedException} will be - * raised. - * - * @param classResolver the {@link ClassResolver} to use for this decoder - */ - public GridObjectDecoder(ClassResolver classResolver) { + + public GridObjectDecoder(ClassResolver classResolver, GridCodec codec) + { this(Integer.MAX_VALUE, classResolver); + this.codec = codec; } /** diff --git a/src/main/java/darks/grid/network/codec/GridObjectEncoder.java b/src/main/java/darks/grid/network/codec/GridObjectEncoder.java index d94013b..4431c4a 100644 --- a/src/main/java/darks/grid/network/codec/GridObjectEncoder.java +++ b/src/main/java/darks/grid/network/codec/GridObjectEncoder.java @@ -13,6 +13,11 @@ public class GridObjectEncoder extends MessageToByteEncoder private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; GridCodec codec = null; + + public GridObjectEncoder(GridCodec codec) + { + this.codec = codec; + } @Override protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception diff --git a/src/main/java/darks/grid/network/codec/HessianCodec.java b/src/main/java/darks/grid/network/codec/HessianCodec.java index fc7eeb8..24786aa 100644 --- a/src/main/java/darks/grid/network/codec/HessianCodec.java +++ b/src/main/java/darks/grid/network/codec/HessianCodec.java @@ -3,6 +3,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.util.Map; import com.caucho.hessian.io.HessianInput; import com.caucho.hessian.io.HessianOutput; @@ -14,6 +15,12 @@ public class HessianCodec implements GridCodec @Override + public void initialize(Map params) { + // TODO Auto-generated method stub + + } + + @Override public void encode(OutputStream out, Serializable msg) throws Exception { HessianOutput ho = new HessianOutput(out); diff --git a/src/main/java/darks/grid/network/codec/KryoCodec.java b/src/main/java/darks/grid/network/codec/KryoCodec.java index f591798..6b08293 100644 --- a/src/main/java/darks/grid/network/codec/KryoCodec.java +++ b/src/main/java/darks/grid/network/codec/KryoCodec.java @@ -1,30 +1,47 @@ package darks.grid.network.codec; +import io.netty.handler.codec.serialization.ClassResolver; + import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.lang.ref.SoftReference; +import java.util.Map; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import io.netty.handler.codec.serialization.ClassResolver; - public class KryoCodec implements GridCodec { - +// private static final String DEFAULT_STREAM_FACTORY = "default"; +// private static final String FAST_STREAM_FACTORY = "fast"; +// private static final String KEY_STREAM_FACTORY = "stream_factory"; - ThreadLocal kryoLocal = new ThreadLocal(); - + private ThreadLocal> kryoLocal = new ThreadLocal>(); + @Override + public void initialize(Map params) + { +// String streamFactoryType = DEFAULT_STREAM_FACTORY; +// if (params.containsKey(KEY_STREAM_FACTORY)) +// { +// String value = params.get(KEY_STREAM_FACTORY); +// if (value != null && FAST_STREAM_FACTORY.equals(value)) +// { +// streamFactoryType = FAST_STREAM_FACTORY; +// } +// } +// if (FAST_STREAM_FACTORY.equals(streamFactoryType)) +// streamFactory = new FastestStreamFactory(); +// else +// streamFactory = new DefaultStreamFactory(); + } + + @Override public void encode(OutputStream out, Serializable msg) throws Exception { - Kryo kryo = kryoLocal.get(); - if (kryo == null) - { - kryo = new Kryo(); - kryoLocal.set(kryo); - } + Kryo kryo = getKryo(); Output output = new Output(out); kryo.writeClassAndObject(output, msg); output.flush(); @@ -34,16 +51,22 @@ public void encode(OutputStream out, Serializable msg) throws Exception @Override public Object decode(InputStream in, ClassResolver classResolver) throws Exception { - Kryo kryo = kryoLocal.get(); - if (kryo == null) - { - kryo = new Kryo(); - kryoLocal.set(kryo); - } + Kryo kryo = getKryo(); Input input = new Input(in); Object ret = kryo.readClassAndObject(input); input.close(); return ret; } + private Kryo getKryo() + { + SoftReference kryoRef = kryoLocal.get(); + Kryo result = kryoRef == null ? null : kryoRef.get(); + if (result == null) + { + result = new Kryo(); + kryoLocal.set(new SoftReference(result)); + } + return result; + } } diff --git a/src/main/java/darks/grid/network/discovery/MERGE_NODES.java b/src/main/java/darks/grid/network/discovery/MERGE_NODES.java index 6b3b93b..919bf91 100644 --- a/src/main/java/darks/grid/network/discovery/MERGE_NODES.java +++ b/src/main/java/darks/grid/network/discovery/MERGE_NODES.java @@ -16,7 +16,6 @@ */ package darks.grid.network.discovery; -import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -25,6 +24,7 @@ import org.slf4j.LoggerFactory; import darks.grid.GridRuntime; +import darks.grid.beans.GridAddress; import darks.grid.beans.GridEvent; import darks.grid.beans.GridNode; import darks.grid.events.EventsChannel; @@ -48,7 +48,7 @@ public MERGE_NODES() public void findNodes() { GridNodesManager nodesManager = GridRuntime.nodes(); - Map nodeAddrMap = new HashMap(); + Map nodeAddrMap = new HashMap(); for (Entry entry : nodesManager.getNodesMap().entrySet()) { GridNode node = entry.getValue(); diff --git a/src/main/java/darks/grid/network/discovery/TCPPING.java b/src/main/java/darks/grid/network/discovery/TCPPING.java index 2a0ff0b..8c0ae5d 100644 --- a/src/main/java/darks/grid/network/discovery/TCPPING.java +++ b/src/main/java/darks/grid/network/discovery/TCPPING.java @@ -16,7 +16,6 @@ */ package darks.grid.network.discovery; -import java.net.InetSocketAddress; import java.util.Collection; import java.util.LinkedHashSet; import java.util.Set; @@ -26,6 +25,7 @@ import darks.grid.GridException; import darks.grid.GridRuntime; +import darks.grid.beans.GridAddress; import darks.grid.manager.GridNodesManager; import darks.grid.utils.ParamsUtils; @@ -36,7 +36,7 @@ public class TCPPING extends GridDiscovery private static final Logger log = LoggerFactory.getLogger(TCPPING.class); - private Collection tryAddressList = null; + private Collection tryAddressList = null; private String hosts = null; @@ -48,21 +48,21 @@ public TCPPING() @Override public void findNodes() { - Set tryAddrs = new LinkedHashSet(); + Set tryAddrs = new LinkedHashSet(); GridNodesManager nodesManager = GridRuntime.nodes(); if (tryAddressList != null) { - for (InetSocketAddress address : tryAddressList) + for (GridAddress address : tryAddressList) { if (nodesManager.contains(address)) continue; tryAddrs.add(address); } } - Collection cacheAddrs = GridRuntime.storage().getCacheHistoryNodes(); + Collection cacheAddrs = GridRuntime.storage().getCacheHistoryNodes(); if (cacheAddrs != null) { - for (InetSocketAddress address : cacheAddrs) + for (GridAddress address : cacheAddrs) { if (nodesManager.contains(address)) continue; @@ -72,7 +72,7 @@ public void findNodes() if (!tryAddrs.isEmpty()) { log.info("TCPPING try to ping address " + tryAddrs.size()); - for (InetSocketAddress address : tryAddrs) + for (GridAddress address : tryAddrs) { if (nodesManager.contains(address)) continue; diff --git a/src/main/java/darks/grid/network/handler/msg/JOIN.java b/src/main/java/darks/grid/network/handler/msg/JOIN.java index 93663a7..972de90 100644 --- a/src/main/java/darks/grid/network/handler/msg/JOIN.java +++ b/src/main/java/darks/grid/network/handler/msg/JOIN.java @@ -16,7 +16,6 @@ */ package darks.grid.network.handler.msg; -import java.net.SocketAddress; import java.util.Map; import java.util.Map.Entry; @@ -24,6 +23,7 @@ import org.slf4j.LoggerFactory; import darks.grid.GridRuntime; +import darks.grid.beans.GridAddress; import darks.grid.beans.GridMessage; import darks.grid.beans.GridNode; import darks.grid.beans.meta.JoinMeta; @@ -98,10 +98,10 @@ else if (count == 1) private void handleRepeatChannel(JoinMeta meta, GridMessage msg) { String nodeId = meta.getNodeId(); - Map nodesMap = GridRuntime.network().getWaitJoin(nodeId); + Map nodesMap = GridRuntime.network().getWaitJoin(nodeId); long keepJoinTime = 0; JoinMeta keepJoinMeta = null; - for (Entry entry : nodesMap.entrySet()) + for (Entry entry : nodesMap.entrySet()) { JoinMeta joinMeta = entry.getValue(); if (keepJoinMeta == null || joinMeta.getJoinTime() < keepJoinTime) @@ -133,7 +133,7 @@ private void handleNewChannel(JoinMeta meta, GridMessage msg, boolean autoJoin) { if (success) GridRuntime.nodes().addRemoteNode(nodeId, meta.getSession(), meta.context()); - Map nodesMap = GridRuntime.network().getWaitJoin(nodeId); + Map nodesMap = GridRuntime.network().getWaitJoin(nodeId); nodesMap.clear(); } } diff --git a/src/main/java/darks/grid/network/handler/msg/JOIN_REPLY.java b/src/main/java/darks/grid/network/handler/msg/JOIN_REPLY.java index 1d3830b..6296114 100644 --- a/src/main/java/darks/grid/network/handler/msg/JOIN_REPLY.java +++ b/src/main/java/darks/grid/network/handler/msg/JOIN_REPLY.java @@ -16,7 +16,6 @@ */ package darks.grid.network.handler.msg; -import java.net.SocketAddress; import java.util.Map; import java.util.Map.Entry; @@ -24,6 +23,7 @@ import org.slf4j.LoggerFactory; import darks.grid.GridRuntime; +import darks.grid.beans.GridAddress; import darks.grid.beans.GridMessage; import darks.grid.beans.meta.JoinMeta; import darks.grid.beans.meta.JoinNodeMeta; @@ -43,8 +43,8 @@ public void handler(GridSession session, GridMessage msg) throws Exception String nodeId = meta.getNodeId(); synchronized (nodeId.intern()) { - Map nodesMap = GridRuntime.network().getWaitJoin(nodeId); - for (Entry entry : nodesMap.entrySet()) + Map nodesMap = GridRuntime.network().getWaitJoin(nodeId); + for (Entry entry : nodesMap.entrySet()) { try { diff --git a/src/main/java/darks/grid/utils/ParamsUtils.java b/src/main/java/darks/grid/utils/ParamsUtils.java index 3cdca90..63e4bc2 100644 --- a/src/main/java/darks/grid/utils/ParamsUtils.java +++ b/src/main/java/darks/grid/utils/ParamsUtils.java @@ -1,6 +1,5 @@ package darks.grid.utils; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -17,6 +16,7 @@ import org.slf4j.LoggerFactory; import darks.grid.GridException; +import darks.grid.beans.GridAddress; public final class ParamsUtils { @@ -47,9 +47,9 @@ private ParamsUtils() * @param hosts * @return */ - public static Collection parseAddress(String hosts) + public static Collection parseAddress(String hosts) { - Set result = new LinkedHashSet(); + Set result = new LinkedHashSet(); String[] ipsArray = hosts.split(","); for (String strIps : ipsArray) { @@ -76,7 +76,7 @@ public static Collection parseAddress(String hosts) { for (Integer port : portList) { - result.add(new InetSocketAddress(ip, port)); + result.add(new GridAddress(ip, port)); } } } diff --git a/src/test/java/darks/grid/test/codec/CodecTest.java b/src/test/java/darks/grid/test/codec/CodecTest.java index c8210a9..c629e4e 100644 --- a/src/test/java/darks/grid/test/codec/CodecTest.java +++ b/src/test/java/darks/grid/test/codec/CodecTest.java @@ -27,8 +27,8 @@ public void testCodec() throws Exception { // final GridCodec codec = new KryoCodec(); // final GridCodec codec = new GenericCodec(); -// final GridCodec codec = new FSTCodec(); - final GridCodec codec = new HessianCodec(); + final GridCodec codec = new FSTCodec(); +// final GridCodec codec = new HessianCodec(); final AtomicInteger byteLen = new AtomicInteger(0); final AtomicLong costSum = new AtomicLong(0L); int threadCount = 10; diff --git a/src/test/java/grid-config.xml b/src/test/java/grid-config.xml index 5a82470..a67fa26 100644 --- a/src/test/java/grid-config.xml +++ b/src/test/java/grid-config.xml @@ -38,8 +38,9 @@ server_boss_thread_delta="2" client_worker_thread_number="4" nodes_expire_time="10min" - cache_history_nodes="false" - /> + cache_history_nodes="false"> + +