Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dubbo-3653] etcd as config center #3663

Merged
merged 13 commits into from
Mar 16, 2019
16 changes: 16 additions & 0 deletions dubbo-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-etcd3</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-monitor-api</artifactId>
Expand Down Expand Up @@ -360,6 +367,13 @@
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-configcenter-etcd</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-compatible</artifactId>
Expand Down Expand Up @@ -490,6 +504,7 @@
<include>org.apache.dubbo:dubbo-registry-zookeeper</include>
<include>org.apache.dubbo:dubbo-registry-redis</include>
<include>org.apache.dubbo:dubbo-registry-consul</include>
<include>org.apache.dubbo:dubbo-registry-etcd3</include>
<include>org.apache.dubbo:dubbo-monitor-api</include>
<include>org.apache.dubbo:dubbo-monitor-default</include>
<include>org.apache.dubbo:dubbo-config-api</include>
Expand All @@ -511,6 +526,7 @@
<include>org.apache.dubbo:dubbo-configcenter-apollo</include>
<include>org.apache.dubbo:dubbo-configcenter-zookeeper</include>
<include>org.apache.dubbo:dubbo-configcenter-consul</include>
<include>org.apache.dubbo:dubbo-configcenter-etcd</include>
<include>org.apache.dubbo:dubbo-metadata-report-api</include>
<include>org.apache.dubbo:dubbo-metadata-definition</include>
<include>org.apache.dubbo:dubbo-metadata-report-redis</include>
Expand Down
5 changes: 5 additions & 0 deletions dubbo-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@
<artifactId>dubbo-configcenter-consul</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-configcenter-etcd</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-metadata-definition</artifactId>
Expand Down
46 changes: 46 additions & 0 deletions dubbo-configcenter/dubbo-configcenter-etcd/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dubbo-configcenter</artifactId>
<groupId>org.apache.dubbo</groupId>
<version>2.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>dubbo-configcenter-etcd</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>The etcd implementation of the config-center api</description>

<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-configcenter-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-remoting-etcd3</artifactId>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.configcenter.support.etcd;

import com.google.protobuf.ByteString;
import io.etcd.jetcd.api.Event;
import io.etcd.jetcd.api.WatchCancelRequest;
import io.etcd.jetcd.api.WatchCreateRequest;
import io.etcd.jetcd.api.WatchGrpc;
import io.etcd.jetcd.api.WatchRequest;
import io.etcd.jetcd.api.WatchResponse;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.configcenter.ConfigChangeEvent;
import org.apache.dubbo.configcenter.ConfigChangeType;
import org.apache.dubbo.configcenter.ConfigurationListener;
import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.remoting.etcd.StateListener;
import org.apache.dubbo.remoting.etcd.jetcd.JEtcdClient;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY;
import static org.apache.dubbo.common.Constants.PATH_SEPARATOR;

/**
* The etcd implementation of {@link DynamicConfiguration}
*/
public class EtcdDynamicConfiguration implements DynamicConfiguration {

/**
* The final root path would be: /$NAME_SPACE/config
*/
private String rootPath;

/**
* The etcd client
*/
private final JEtcdClient etcdClient;

/**
* The map store the key to {@link EtcdConfigWatcher} mapping
*/
private final ConcurrentMap<ConfigurationListener, EtcdConfigWatcher> watchListenerMap;

EtcdDynamicConfiguration(URL url) {
rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
etcdClient = new JEtcdClient(url);
etcdClient.addStateListener(state -> {
if (state == StateListener.CONNECTED) {
try {
recover();
} catch (Exception e) {
// ignore
}
}
});
watchListenerMap = new ConcurrentHashMap<>();
}

@Override
public void addListener(String key, String group, ConfigurationListener listener) {
if (watchListenerMap.get(listener) == null) {
String normalizedKey = convertKey(key);
EtcdConfigWatcher watcher = new EtcdConfigWatcher(normalizedKey, listener);
watchListenerMap.put(listener, watcher);
watcher.watch();
}
}

@Override
public void removeListener(String key, String group, ConfigurationListener listener) {
EtcdConfigWatcher watcher = watchListenerMap.get(listener);
watcher.cancelWatch();
}

// TODO Abstract the logic into super class
@Override
public String getConfig(String key, String group, long timeout) throws IllegalStateException {
if (StringUtils.isNotEmpty(group)) {
key = group + PATH_SEPARATOR + key;
} else {
int i = key.lastIndexOf(".");
key = key.substring(0, i) + PATH_SEPARATOR + key.substring(i + 1);
}
return (String) getInternalProperty(rootPath + PATH_SEPARATOR + key);
}

@Override
public Object getInternalProperty(String key) {
return etcdClient.getKVValue(key);
}


private String convertKey(String key) {
int index = key.lastIndexOf('.');
return rootPath + PATH_SEPARATOR + key.substring(0, index) + PATH_SEPARATOR + key.substring(index + 1);
}

private void recover() {
for (EtcdConfigWatcher watcher: watchListenerMap.values()) {
watcher.watch();
}
}

public class EtcdConfigWatcher implements StreamObserver<WatchResponse> {

private ConfigurationListener listener;
protected WatchGrpc.WatchStub watchStub;
private StreamObserver<WatchRequest> observer;
protected long watchId;
private ManagedChannel channel;
private String key;

public EtcdConfigWatcher(String key, ConfigurationListener listener) {
this.key = key;
this.listener = listener;
this.channel = etcdClient.getChannel();
}

@Override
public void onNext(WatchResponse watchResponse) {
this.watchId = watchResponse.getWatchId();
for (Event etcdEvent : watchResponse.getEventsList()) {
ConfigChangeType type = ConfigChangeType.MODIFIED;
if (etcdEvent.getType() == Event.EventType.DELETE) {
type = ConfigChangeType.DELETED;
}
ConfigChangeEvent event = new ConfigChangeEvent(
etcdEvent.getKv().getKey().toString(UTF_8),
etcdEvent.getKv().getValue().toString(UTF_8), type);
listener.process(event);
}
}

@Override
public void onError(Throwable throwable) {
// ignore
}

@Override
public void onCompleted() {
// ignore
}

public long getWatchId() {
return watchId;
}

private void watch() {
watchStub = WatchGrpc.newStub(channel);
observer = watchStub.watch(this);
WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
.setKey(ByteString.copyFromUtf8(key))
.setProgressNotify(true);
WatchRequest req = WatchRequest.newBuilder().setCreateRequest(builder).build();
observer.onNext(req);
}

private void cancelWatch() {
WatchCancelRequest watchCancelRequest =
WatchCancelRequest.newBuilder().setWatchId(watchId).build();
WatchRequest cancelRequest = WatchRequest.newBuilder()
.setCancelRequest(watchCancelRequest).build();
observer.onNext(cancelRequest);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.configcenter.support.etcd;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory;
import org.apache.dubbo.configcenter.DynamicConfiguration;

/**
* The etcd implementation of {@link AbstractDynamicConfigurationFactory}
*/
public class EtcdDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory {

@Override
protected DynamicConfiguration createDynamicConfiguration(URL url) {
return new EtcdDynamicConfiguration(url);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
etcd=org.apache.dubbo.configcenter.support.etcd.EtcdDynamicConfigurationFactory
Loading