Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polish code and fix some documentation errors #3655

Merged
merged 1 commit into from
Mar 18, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.dubbo.rpc.RpcException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -69,11 +70,10 @@ public class EtcdRegistry extends FailbackRegistry {

private final String root;

private final Set<String> anyServices = new ConcurrentHashSet<String>();
private final Set<String> anyServices = new ConcurrentHashSet<>();

private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> etcdListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> etcdListeners = new ConcurrentHashMap<>();
private final EtcdClient etcdClient;
private long expirePeriod;

public EtcdRegistry(URL url, EtcdTransporter etcdTransporter) {
super(url);
Expand All @@ -86,14 +86,12 @@ public EtcdRegistry(URL url, EtcdTransporter etcdTransporter) {
}
this.root = group;
etcdClient = etcdTransporter.connect(url);
etcdClient.addStateListener(new StateListener() {
public void stateChanged(int state) {
if (state == CONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
etcdClient.addStateListener(state -> {
if (state == StateListener.CONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
Expand Down Expand Up @@ -123,7 +121,7 @@ public void doRegister(URL url) {
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to etcd " + getUrl()
+ ", cause: " + (OptionUtil.isProtocolError(e)
? "etcd3 registy maybe not supported yet or etcd3 registry not available."
? "etcd3 registry may not be supported yet or etcd3 registry is not available."
: e.getMessage()), e);
}
}
Expand All @@ -144,9 +142,9 @@ public void doSubscribe(URL url, NotifyListener listener) {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();

/**
* if we interesting all interfaces,
* we find current or create container for url, put or get only once.
/*
* if we are interested in all interfaces,
* find out the current container or create one for the url, put or get only once.
*/
ConcurrentMap<NotifyListener, ChildListener> listeners =
Optional.ofNullable(etcdListeners.get(url))
Expand All @@ -156,42 +154,40 @@ public void doSubscribe(URL url, NotifyListener listener) {
return prev != null ? prev : container;
});

/**
* if we have not interface watcher listener,
* we find current or create listener for current root, put or get only once.
/*
* if we have no interface watcher listener,
* find the current listener or create one for the current root, put or get only once.
*/
ChildListener interfaceListener =
Optional.ofNullable(listeners.get(listener))
.orElseGet(() -> {
ChildListener childListener, prev;
prev = listeners.putIfAbsent(listener, childListener = new ChildListener() {
public void childChanged(String parentPath, List<String> currentChildren) {
/**
* because etcd3 not support direct children watch events,
* we should filter not interface events. if we watch /dubbo
* and /dubbo/interface, when we put key-value pair {/dubbo/interface/hello hello},
* we will got events in watching path /dubbo.
*/
for (String child : currentChildren) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
/**
* if new interface event arrived, we watching direct children,
* eg: /dubbo/interface, /dubbo/interface and so on.
*/
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
prev = listeners.putIfAbsent(listener, childListener = (parentPath, currentChildren) -> {
/*
* because etcd3 does not support direct children watch events,
* we should filter not interface events. if we watch /dubbo
* and /dubbo/interface, when we put a key-value pair {/dubbo/interface/hello hello},
* we will got events in watching path /dubbo.
*/
for (String child : currentChildren) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
/*
* if new interface event arrived, we watch their direct children,
* eg: /dubbo/interface, /dubbo/interface and so on.
*/
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
});
return prev != null ? prev : childListener;
});

etcdClient.create(root);
/**
* first time, we want pull already interface and then watching direct children,
/*
* at the first time, we want to pull already interface and then watch their direct children,
* eg: /dubbo/interface, /dubbo/interface and so on.
*/
List<String> services = etcdClient.addChildListener(root, interfaceListener);
Expand All @@ -202,47 +198,42 @@ public void childChanged(String parentPath, List<String> currentChildren) {
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
} else {
List<URL> urls = new ArrayList<URL>();
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {

/**
* if we interesting special categories (providers, consumers, routers and so on),
* we find current or create container for url, put or get only once.
/*
* if we are interested in special categories (providers, consumers, routers and so on),
* we find out the current container or create one for the url, put or get only once.
*/
ConcurrentMap<NotifyListener, ChildListener> listeners =
Optional.ofNullable(etcdListeners.get(url))
.orElseGet(() -> {
ConcurrentMap<NotifyListener, ChildListener> container, prev;
prev = etcdListeners.putIfAbsent(url,
container = new ConcurrentHashMap<NotifyListener, ChildListener>());
container = new ConcurrentHashMap<>());
return prev != null ? prev : container;
});

/**
/*
* if we have no category watcher listener,
* we find current or create listener for current category, put or get only once.
* we find out the current listener or create one for the current category, put or get only once.
*/
ChildListener childListener =
Optional.ofNullable(listeners.get(listener))
.orElseGet(() -> {
ChildListener watchListener, prev;
prev = listeners.putIfAbsent(listener, watchListener = new ChildListener() {
public void childChanged(String parentPath, List<String> currentChildren) {
EtcdRegistry.this.notify(url, listener,
toUrlsWithEmpty(url, parentPath, currentChildren));
}
});
prev = listeners.putIfAbsent(listener, watchListener = (parentPath, currentChildren) -> EtcdRegistry.this.notify(url, listener,
toUrlsWithEmpty(url, parentPath, currentChildren)));
return prev != null ? prev : watchListener;
});

etcdClient.create(path);
/**
* first time, we want pull already category and then watching direct children,
/*
* at the first time, we want to pull already category and then watch their direct children,
* eg: /dubbo/interface/providers, /dubbo/interface/consumers and so on.
*/
List<String> children = etcdClient.addChildListener(path, childListener);
if (children != null) {
final String watchPath = path;
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
Expand All @@ -251,7 +242,7 @@ public void childChanged(String parentPath, List<String> currentChildren) {
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to etcd " + getUrl()
+ ", cause: " + (OptionUtil.isProtocolError(e)
? "etcd3 registy maybe not supported yet or etcd3 registry not available."
? "etcd3 registry may not be supported yet or etcd3 registry is not available."
: e.getMessage()), e);
}
}
Expand All @@ -262,7 +253,7 @@ public void doUnsubscribe(URL url, NotifyListener listener) {
if (listeners != null) {
ChildListener etcdListener = listeners.get(listener);
if (etcdListener != null) {
// maybe url has many subscribe path
// maybe url has many subscribed paths
for (String path : toUnsubscribedPath(url)) {
etcdClient.removeChildListener(path, etcdListener);
}
Expand Down Expand Up @@ -305,16 +296,16 @@ protected String toServicePath(URL url) {
}

protected String[] toCategoriesPath(URL url) {
String[] categroies;
String[] categories;
if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) {
categroies = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,
categories = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,
Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY};
} else {
categroies = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY});
categories = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY});
}
String[] paths = new String[categroies.length];
for (int i = 0; i < categroies.length; i++) {
paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categroies[i];
String[] paths = new String[categories.length];
for (int i = 0; i < categories.length; i++) {
paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categories[i];
}
return paths;
}
Expand All @@ -337,15 +328,13 @@ protected List<String> toUnsubscribedPath(URL url) {
categories.add(group);
return categories;
} else {
for (String path : toCategoriesPath(url)) {
categories.add(path);
}
categories.addAll(Arrays.asList(toCategoriesPath(url)));
}
return categories;
}

protected List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
List<URL> urls = new ArrayList<URL>();
List<URL> urls = new ArrayList<>();
if (providers != null && providers.size() > 0) {
for (String provider : providers) {
provider = URL.decode(provider);
Expand Down