Skip to content

Commit

Permalink
[Dubbo-3653] etcd as config center (#3663)
Browse files Browse the repository at this point in the history
* Minor refactor, no functinoal change.

* Separate ConnectionStateListener

* Simplify code

* Fix typo

* Support get external config from etcd config center

* Polish diamond operator

* Initial etcd support as config center

* Add a put interface for JEtcdClient

* Enhanced Etcd config center support with the ability to watch and cancel watch

* Polish code

* Distinguish modification event and delete event

* Add etcd registry and configcenter to dubbo-all

* Watch again when connection is re-established
  • Loading branch information
ralf0131 authored and beiwei30 committed Mar 16, 2019
1 parent 7fefb81 commit cf23d61
Show file tree
Hide file tree
Showing 15 changed files with 811 additions and 158 deletions.
16 changes: 16 additions & 0 deletions dubbo-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-etcd3</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-monitor-api</artifactId>
Expand Down Expand Up @@ -360,6 +367,13 @@
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-configcenter-etcd</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-compatible</artifactId>
Expand Down Expand Up @@ -494,6 +508,7 @@
<include>org.apache.dubbo:dubbo-registry-zookeeper</include>
<include>org.apache.dubbo:dubbo-registry-redis</include>
<include>org.apache.dubbo:dubbo-registry-consul</include>
<include>org.apache.dubbo:dubbo-registry-etcd3</include>
<include>org.apache.dubbo:dubbo-monitor-api</include>
<include>org.apache.dubbo:dubbo-monitor-default</include>
<include>org.apache.dubbo:dubbo-config-api</include>
Expand All @@ -515,6 +530,7 @@
<include>org.apache.dubbo:dubbo-configcenter-apollo</include>
<include>org.apache.dubbo:dubbo-configcenter-zookeeper</include>
<include>org.apache.dubbo:dubbo-configcenter-consul</include>
<include>org.apache.dubbo:dubbo-configcenter-etcd</include>
<include>org.apache.dubbo:dubbo-metadata-report-api</include>
<include>org.apache.dubbo:dubbo-metadata-definition</include>
<include>org.apache.dubbo:dubbo-metadata-report-redis</include>
Expand Down
5 changes: 5 additions & 0 deletions dubbo-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@
<artifactId>dubbo-configcenter-consul</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-configcenter-etcd</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-metadata-definition</artifactId>
Expand Down
46 changes: 46 additions & 0 deletions dubbo-configcenter/dubbo-configcenter-etcd/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dubbo-configcenter</artifactId>
<groupId>org.apache.dubbo</groupId>
<version>2.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>dubbo-configcenter-etcd</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>The etcd implementation of the config-center api</description>

<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-configcenter-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-remoting-etcd3</artifactId>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.configcenter.support.etcd;

import com.google.protobuf.ByteString;
import io.etcd.jetcd.api.Event;
import io.etcd.jetcd.api.WatchCancelRequest;
import io.etcd.jetcd.api.WatchCreateRequest;
import io.etcd.jetcd.api.WatchGrpc;
import io.etcd.jetcd.api.WatchRequest;
import io.etcd.jetcd.api.WatchResponse;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.configcenter.ConfigChangeEvent;
import org.apache.dubbo.configcenter.ConfigChangeType;
import org.apache.dubbo.configcenter.ConfigurationListener;
import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.remoting.etcd.StateListener;
import org.apache.dubbo.remoting.etcd.jetcd.JEtcdClient;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY;
import static org.apache.dubbo.common.Constants.PATH_SEPARATOR;

/**
* The etcd implementation of {@link DynamicConfiguration}
*/
public class EtcdDynamicConfiguration implements DynamicConfiguration {

/**
* The final root path would be: /$NAME_SPACE/config
*/
private String rootPath;

/**
* The etcd client
*/
private final JEtcdClient etcdClient;

/**
* The map store the key to {@link EtcdConfigWatcher} mapping
*/
private final ConcurrentMap<ConfigurationListener, EtcdConfigWatcher> watchListenerMap;

EtcdDynamicConfiguration(URL url) {
rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
etcdClient = new JEtcdClient(url);
etcdClient.addStateListener(state -> {
if (state == StateListener.CONNECTED) {
try {
recover();
} catch (Exception e) {
// ignore
}
}
});
watchListenerMap = new ConcurrentHashMap<>();
}

@Override
public void addListener(String key, String group, ConfigurationListener listener) {
if (watchListenerMap.get(listener) == null) {
String normalizedKey = convertKey(key);
EtcdConfigWatcher watcher = new EtcdConfigWatcher(normalizedKey, listener);
watchListenerMap.put(listener, watcher);
watcher.watch();
}
}

@Override
public void removeListener(String key, String group, ConfigurationListener listener) {
EtcdConfigWatcher watcher = watchListenerMap.get(listener);
watcher.cancelWatch();
}

// TODO Abstract the logic into super class
@Override
public String getConfig(String key, String group, long timeout) throws IllegalStateException {
if (StringUtils.isNotEmpty(group)) {
key = group + PATH_SEPARATOR + key;
} else {
int i = key.lastIndexOf(".");
key = key.substring(0, i) + PATH_SEPARATOR + key.substring(i + 1);
}
return (String) getInternalProperty(rootPath + PATH_SEPARATOR + key);
}

@Override
public Object getInternalProperty(String key) {
return etcdClient.getKVValue(key);
}


private String convertKey(String key) {
int index = key.lastIndexOf('.');
return rootPath + PATH_SEPARATOR + key.substring(0, index) + PATH_SEPARATOR + key.substring(index + 1);
}

private void recover() {
for (EtcdConfigWatcher watcher: watchListenerMap.values()) {
watcher.watch();
}
}

public class EtcdConfigWatcher implements StreamObserver<WatchResponse> {

private ConfigurationListener listener;
protected WatchGrpc.WatchStub watchStub;
private StreamObserver<WatchRequest> observer;
protected long watchId;
private ManagedChannel channel;
private String key;

public EtcdConfigWatcher(String key, ConfigurationListener listener) {
this.key = key;
this.listener = listener;
this.channel = etcdClient.getChannel();
}

@Override
public void onNext(WatchResponse watchResponse) {
this.watchId = watchResponse.getWatchId();
for (Event etcdEvent : watchResponse.getEventsList()) {
ConfigChangeType type = ConfigChangeType.MODIFIED;
if (etcdEvent.getType() == Event.EventType.DELETE) {
type = ConfigChangeType.DELETED;
}
ConfigChangeEvent event = new ConfigChangeEvent(
etcdEvent.getKv().getKey().toString(UTF_8),
etcdEvent.getKv().getValue().toString(UTF_8), type);
listener.process(event);
}
}

@Override
public void onError(Throwable throwable) {
// ignore
}

@Override
public void onCompleted() {
// ignore
}

public long getWatchId() {
return watchId;
}

private void watch() {
watchStub = WatchGrpc.newStub(channel);
observer = watchStub.watch(this);
WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
.setKey(ByteString.copyFromUtf8(key))
.setProgressNotify(true);
WatchRequest req = WatchRequest.newBuilder().setCreateRequest(builder).build();
observer.onNext(req);
}

private void cancelWatch() {
WatchCancelRequest watchCancelRequest =
WatchCancelRequest.newBuilder().setWatchId(watchId).build();
WatchRequest cancelRequest = WatchRequest.newBuilder()
.setCancelRequest(watchCancelRequest).build();
observer.onNext(cancelRequest);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.configcenter.support.etcd;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory;
import org.apache.dubbo.configcenter.DynamicConfiguration;

/**
* The etcd implementation of {@link AbstractDynamicConfigurationFactory}
*/
public class EtcdDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory {

@Override
protected DynamicConfiguration createDynamicConfiguration(URL url) {
return new EtcdDynamicConfiguration(url);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
etcd=org.apache.dubbo.configcenter.support.etcd.EtcdDynamicConfigurationFactory
Loading

0 comments on commit cf23d61

Please sign in to comment.