Skip to content

Commit

Permalink
Polish code and fix some documentation errors (#3655)
Browse files Browse the repository at this point in the history
  • Loading branch information
kezhenxu94 authored and ralf0131 committed Mar 18, 2019
1 parent b2bfbc7 commit 79f0b7c
Showing 1 changed file with 48 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.dubbo.rpc.RpcException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -69,11 +70,10 @@ public class EtcdRegistry extends FailbackRegistry {

private final String root;

private final Set<String> anyServices = new ConcurrentHashSet<String>();
private final Set<String> anyServices = new ConcurrentHashSet<>();

private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> etcdListeners = new ConcurrentHashMap<>();
private final EtcdClient etcdClient;
private long expirePeriod;

public EtcdRegistry(URL url, EtcdTransporter etcdTransporter) {
super(url);
Expand Down Expand Up @@ -121,7 +121,7 @@ public void doRegister(URL url) {
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to etcd " + getUrl()
+ ", cause: " + (OptionUtil.isProtocolError(e)
? "etcd3 registy maybe not supported yet or etcd3 registry not available."
? "etcd3 registry may not be supported yet or etcd3 registry is not available."
: e.getMessage()), e);
}
}
Expand All @@ -142,9 +142,9 @@ public void doSubscribe(URL url, NotifyListener listener) {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();

/**
* if we interesting all interfaces,
* we find current or create container for url, put or get only once.
/*
* if we are interested in all interfaces,
* find out the current container or create one for the url, put or get only once.
*/
ConcurrentMap<NotifyListener, ChildListener> listeners =
Optional.ofNullable(etcdListeners.get(url))
Expand All @@ -154,42 +154,40 @@ public void doSubscribe(URL url, NotifyListener listener) {
return prev != null ? prev : container;
});

/**
* if we have not interface watcher listener,
* we find current or create listener for current root, put or get only once.
/*
* if we have no interface watcher listener,
* find the current listener or create one for the current root, put or get only once.
*/
ChildListener interfaceListener =
Optional.ofNullable(listeners.get(listener))
.orElseGet(() -> {
ChildListener childListener, prev;
prev = listeners.putIfAbsent(listener, childListener = new ChildListener() {
public void childChanged(String parentPath, List<String> currentChildren) {
/**
* because etcd3 not support direct children watch events,
* we should filter not interface events. if we watch /dubbo
* and /dubbo/interface, when we put key-value pair {/dubbo/interface/hello hello},
* we will got events in watching path /dubbo.
*/
for (String child : currentChildren) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
/**
* if new interface event arrived, we watching direct children,
* eg: /dubbo/interface, /dubbo/interface and so on.
*/
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
prev = listeners.putIfAbsent(listener, childListener = (parentPath, currentChildren) -> {
/*
* because etcd3 does not support direct children watch events,
* we should filter not interface events. if we watch /dubbo
* and /dubbo/interface, when we put a key-value pair {/dubbo/interface/hello hello},
* we will got events in watching path /dubbo.
*/
for (String child : currentChildren) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
/*
* if new interface event arrived, we watch their direct children,
* eg: /dubbo/interface, /dubbo/interface and so on.
*/
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
});
return prev != null ? prev : childListener;
});

etcdClient.create(root);
/**
* first time, we want pull already interface and then watching direct children,
/*
* at the first time, we want to pull already interface and then watch their direct children,
* eg: /dubbo/interface, /dubbo/interface and so on.
*/
List<String> services = etcdClient.addChildListener(root, interfaceListener);
Expand All @@ -200,47 +198,42 @@ public void childChanged(String parentPath, List<String> currentChildren) {
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
} else {
List<URL> urls = new ArrayList<URL>();
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {

/**
* if we interesting special categories (providers, consumers, routers and so on),
* we find current or create container for url, put or get only once.
/*
* if we are interested in special categories (providers, consumers, routers and so on),
* we find out the current container or create one for the url, put or get only once.
*/
ConcurrentMap<NotifyListener, ChildListener> listeners =
Optional.ofNullable(etcdListeners.get(url))
.orElseGet(() -> {
ConcurrentMap<NotifyListener, ChildListener> container, prev;
prev = etcdListeners.putIfAbsent(url,
container = new ConcurrentHashMap<NotifyListener, ChildListener>());
container = new ConcurrentHashMap<>());
return prev != null ? prev : container;
});

/**
/*
* if we have no category watcher listener,
* we find current or create listener for current category, put or get only once.
* we find out the current listener or create one for the current category, put or get only once.
*/
ChildListener childListener =
Optional.ofNullable(listeners.get(listener))
.orElseGet(() -> {
ChildListener watchListener, prev;
prev = listeners.putIfAbsent(listener, watchListener = new ChildListener() {
public void childChanged(String parentPath, List<String> currentChildren) {
EtcdRegistry.this.notify(url, listener,
toUrlsWithEmpty(url, parentPath, currentChildren));
}
});
prev = listeners.putIfAbsent(listener, watchListener = (parentPath, currentChildren) -> EtcdRegistry.this.notify(url, listener,
toUrlsWithEmpty(url, parentPath, currentChildren)));
return prev != null ? prev : watchListener;
});

etcdClient.create(path);
/**
* first time, we want pull already category and then watching direct children,
/*
* at the first time, we want to pull already category and then watch their direct children,
* eg: /dubbo/interface/providers, /dubbo/interface/consumers and so on.
*/
List<String> children = etcdClient.addChildListener(path, childListener);
if (children != null) {
final String watchPath = path;
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
Expand All @@ -249,7 +242,7 @@ public void childChanged(String parentPath, List<String> currentChildren) {
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to etcd " + getUrl()
+ ", cause: " + (OptionUtil.isProtocolError(e)
? "etcd3 registy maybe not supported yet or etcd3 registry not available."
? "etcd3 registry may not be supported yet or etcd3 registry is not available."
: e.getMessage()), e);
}
}
Expand All @@ -260,7 +253,7 @@ public void doUnsubscribe(URL url, NotifyListener listener) {
if (listeners != null) {
ChildListener etcdListener = listeners.get(listener);
if (etcdListener != null) {
// maybe url has many subscribe path
// maybe url has many subscribed paths
for (String path : toUnsubscribedPath(url)) {
etcdClient.removeChildListener(path, etcdListener);
}
Expand Down Expand Up @@ -303,16 +296,16 @@ protected String toServicePath(URL url) {
}

protected String[] toCategoriesPath(URL url) {
String[] categroies;
String[] categories;
if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) {
categroies = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,
categories = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,
Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY};
} else {
categroies = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY});
categories = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY});
}
String[] paths = new String[categroies.length];
for (int i = 0; i < categroies.length; i++) {
paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categroies[i];
String[] paths = new String[categories.length];
for (int i = 0; i < categories.length; i++) {
paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categories[i];
}
return paths;
}
Expand All @@ -335,9 +328,7 @@ protected List<String> toUnsubscribedPath(URL url) {
categories.add(group);
return categories;
} else {
for (String path : toCategoriesPath(url)) {
categories.add(path);
}
categories.addAll(Arrays.asList(toCategoriesPath(url)));
}
return categories;
}
Expand Down

0 comments on commit 79f0b7c

Please sign in to comment.