Skip to content

Commit

Permalink
Convert RxNetty to native Netty (Azure#257)
Browse files Browse the repository at this point in the history
* Add file request body

* More rx adapter

* Add more code

* Implement shared channel pool

* Functional impl of shared channel pool

* Clean ups

* Rename RxNetty -> Netty

* Implement LRU for multi hash map

* Fix checkstyle

* add netty client & netty adapter

* Merge and fix errors

* Use an executor for pooling

* Fix 1 core machine hanging

* Checkstyle
  • Loading branch information
jianghaolu authored Oct 17, 2017
1 parent f8a07d0 commit db8e593
Show file tree
Hide file tree
Showing 22 changed files with 1,066 additions and 414 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
import com.microsoft.rest.annotations.POST;
import com.microsoft.rest.annotations.PathParam;
import com.microsoft.rest.http.HttpClient;
import com.microsoft.rest.http.HttpClient.Configuration;
import com.microsoft.rest.http.RxNettyAdapter;
import com.microsoft.rest.http.NettyClient;
import com.microsoft.rest.policy.RequestPolicy;
import com.microsoft.rest.protocol.SerializerAdapter;
import rx.Single;
Expand All @@ -46,8 +45,8 @@ final class RefreshTokenClient {
}

private static HttpClient createHttpClient(Proxy proxy) {
return new RxNettyAdapter.Factory()
.create(new Configuration(Collections.<RequestPolicy.Factory>emptyList(), proxy));
return new NettyClient.Factory()
.create(new HttpClient.Configuration(Collections.<RequestPolicy.Factory>emptyList(), proxy));
}

AuthenticationResult refreshToken(String tenant, String clientId, String resource, String refreshToken, boolean isMultipleResoureRefreshToken) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.microsoft.azure;

import com.microsoft.rest.http.HttpClient;
import com.microsoft.rest.http.HttpClient.Configuration;
import com.microsoft.rest.http.NettyClient;
import com.microsoft.rest.policy.RequestPolicy;

import java.util.Collections;

public class AzureProxyToRestProxyWithNettyTests extends AzureProxyToRestProxyTests {
private final NettyClient.Factory nettyClientFactory = new NettyClient.Factory();

@Override
protected HttpClient createHttpClient() {
return nettyClientFactory.create(new Configuration(Collections.<RequestPolicy.Factory>emptyList(), null));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,23 @@
import com.microsoft.azure.MockResource;
import com.microsoft.azure.ResourceWithProvisioningState;
import com.microsoft.azure.ProvisioningState;
import com.microsoft.rest.http.ByteArrayHttpRequestBody;
import com.microsoft.rest.http.FileRequestBody;
import com.microsoft.rest.http.FileSegment;
import com.microsoft.rest.http.HttpClient;
import com.microsoft.rest.http.HttpHeader;
import com.microsoft.rest.http.HttpHeaders;
import com.microsoft.rest.http.HttpRequest;
import com.microsoft.rest.http.HttpResponse;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import rx.Single;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

Expand Down
15 changes: 6 additions & 9 deletions client-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,9 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxnetty-http</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxnetty-common</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<artifactId>netty-handler</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
Expand Down Expand Up @@ -104,6 +96,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
15 changes: 3 additions & 12 deletions client-runtime/src/main/java/com/microsoft/rest/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package com.microsoft.rest;

import com.microsoft.rest.credentials.ServiceClientCredentials;
import com.microsoft.rest.http.HttpClient;
import com.microsoft.rest.http.NettyClient;
import com.microsoft.rest.policy.AddCookiesPolicy;
import com.microsoft.rest.policy.CredentialsPolicy;
import com.microsoft.rest.policy.LoggingPolicy;
Expand All @@ -15,14 +17,10 @@
import com.microsoft.rest.policy.UserAgentPolicy;
import com.microsoft.rest.protocol.Environment;
import com.microsoft.rest.protocol.SerializerAdapter;
import com.microsoft.rest.http.ChannelHandlerConfig;
import com.microsoft.rest.http.HttpClient;
import com.microsoft.rest.http.RxNettyAdapter;
import com.microsoft.rest.serializer.JacksonAdapter;

import java.net.Proxy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -141,13 +139,6 @@ public String userAgent() {
return userAgent;
}

/**
* @return a new initialized instance of the default HttpClient type.
*/
public static HttpClient createDefaultHttpClient() {
return new RxNettyAdapter(Collections.<RequestPolicy.Factory>emptyList(), Collections.<ChannelHandlerConfig>emptyList());
}

/**
* @return a new initialized instance of the default SerializerAdapter type.
*/
Expand Down Expand Up @@ -197,7 +188,7 @@ private Builder(final RestClient restClient) {
* Creates an instance of the builder.
*/
public Builder() {
this.httpClientFactory = new RxNettyAdapter.Factory();
this.httpClientFactory = new NettyClient.Factory();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ public int contentLength() {
return contents.length;
}

/**
* @return the content of the request, in the format of a byte array.
*/
public byte[] content() {
return contents;
}

@Override
public String contentType() {
return contentType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for
* license information.
*/

package com.microsoft.rest.http;

import com.google.common.base.Predicate;
import com.google.common.collect.Sets;

import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A thread-safe multi map where the values for a certain key are FIFO organized.
* @param <K> the key type
* @param <V> the value type
*/
public class ConcurrentMultiHashMap<K, V> {
private final Map<K, ConcurrentLinkedQueue<V>> data;
private final AtomicInteger size;

/**
* Create a concurrent multi hash map.
*/
public ConcurrentMultiHashMap() {
this.data = Collections.synchronizedMap(new LinkedHashMap<K, ConcurrentLinkedQueue<V>>(16, 0.75f, true));
this.size = new AtomicInteger(0);
}

/**
* Add a new key value pair to the multimap.
*
* @param key the key to put
* @param value the value to put
* @return the added value
*/
public V put(K key, V value) {
synchronized (size) {
if (!data.containsKey(key)) {
data.put(key, new ConcurrentLinkedQueue<V>());
}
data.get(key).add(value);
size.incrementAndGet();
return value;
}
}

/**
* Returns the queue associated with the given key.
*
* @param key the key to query
* @return the queue associated with the key
*/
public ConcurrentLinkedQueue<V> get(K key) {
return data.get(key);
}

/**
* Retrieves and removes one item from the multi map. The item is from
* the least recently used key set.
* @return the item removed from the map
*/
public V poll() {
synchronized (size) {
if (size.get() == 0) {
return null;
} else {
K key;
synchronized (data) {
Iterator<K> keys = data.keySet().iterator();
key = keys.next();
}
return poll(key);
}
}
}

/**
* Retrieves the least recently used item in the queue for the given key.
*
* @param key the key to poll an item
* @return the least recently used item for the key
*/
public V poll(K key) {
if (!data.containsKey(key)) {
return null;
} else {
ConcurrentLinkedQueue<V> queue = data.get(key);
V ret;
synchronized (size) {
size.decrementAndGet();
ret = queue.poll();
}
if (queue.isEmpty()) {
data.remove(key);
}
return ret;
}
}

/**
* @return the size of the multimap.
*/
public int size() {
return size.get();
}

/**
* Checks if there are values associated with a key in the multimap.
*
* @param key the key to check
* @return true if there are values associated
*/
public boolean containsKey(K key) {
return data.containsKey(key) && data.get(key).size() > 0;
}

/**
* @return the set of keys with which there are values associated
*/
public Set<K> keys() {
return Sets.filter(data.keySet(), new Predicate<K>() {
@Override
public boolean apply(K input) {
return data.get(input).size() > 0;
}
});
}

/**
* @return the set of all values for all keys in the multimap.
*/
public Set<V> values() {
Set<V> values = new HashSet<>();
for (K k : keys()) {
values.addAll(data.get(k));
}
return values;
}

/**
* Removes a key value pair in the multimap. If there's no such key value
* pair then this returns false. Otherwise this method removes it and
* returns true.
*
* @param key the key to remove
* @param value the value to remove
* @return true if an item is removed
*/
public boolean remove(K key, V value) {
if (!data.containsKey(key)) {
return false;
}
ConcurrentLinkedQueue<V> queue = data.get(key);
boolean removed;
synchronized (size) {
size.decrementAndGet();
removed = queue.remove(value);
}
if (queue.isEmpty()) {
data.remove(key);
}
return removed;
}
}
Loading

0 comments on commit db8e593

Please sign in to comment.