Skip to content

Commit

Permalink
Merge pull request #656 from foxish/make-async
Browse files Browse the repository at this point in the history
  • Loading branch information
fusesource-ci authored Feb 10, 2017
2 parents aba2d55 + 305907a commit ad62c97
Showing 1 changed file with 52 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.dsl.internal;

import static io.fabric8.kubernetes.client.utils.Utils.isNotNullOrEmpty;
import static java.net.HttpURLConnection.HTTP_GONE;
package io.fabric8.kubernetes.client.dsl.internal;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.api.model.WatchEvent;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;

import io.fabric8.kubernetes.client.dsl.base.OperationSupport;
import okhttp3.*;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.BufferedSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.HttpUrl;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.BufferedSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.fabric8.kubernetes.client.utils.Utils.isNotNullOrEmpty;
import static java.net.HttpURLConnection.HTTP_GONE;

public class WatchHTTPManager<T extends HasMetadata, L extends KubernetesResourceList<T>> implements
Watch {
Expand Down Expand Up @@ -79,9 +71,9 @@ public Thread newThread(Runnable r) {
});

public WatchHTTPManager(final OkHttpClient client,
final BaseOperation<T, L, ?, ?> baseOperation,
final String version, final Watcher<T> watcher, final int reconnectInterval,
final int reconnectLimit, long connectTimeout)
final BaseOperation<T, L, ?, ?> baseOperation,
final String version, final Watcher<T> watcher, final int reconnectInterval,
final int reconnectLimit, long connectTimeout)
throws MalformedURLException {

if (version == null) {
Expand All @@ -97,7 +89,7 @@ public WatchHTTPManager(final OkHttpClient client,

OkHttpClient clonedClient = client.newBuilder()
.connectTimeout(connectTimeout, TimeUnit.MILLISECONDS)
.readTimeout(0,TimeUnit.MILLISECONDS)
.readTimeout(0, TimeUnit.MILLISECONDS)
.cache(null)
.build();

Expand Down Expand Up @@ -147,42 +139,51 @@ private final void runWatch() {
.addHeader("Origin", requestUrl.getProtocol() + "://" + requestUrl.getHost() + ":" + requestUrl.getPort())
.build();

Response response = null;
try {
response = clonedClient.newCall(request).execute();
if(!response.isSuccessful()) {
throw OperationSupport.requestFailure(request,
OperationSupport.createStatus(response.code(), response.message()));
clonedClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
logger.info("Watch connection failed. reason: {}", e.getMessage());
scheduleReconnect();
}

BufferedSource source = response.body().source();
while (!source.exhausted()) {
String message = source.readUtf8LineStrict();
onMessage(message);
}
} catch (Exception e) {
logger.info("Watch connection close received. reason: {}", e.getMessage());
} finally {
if (forceClosed.get()) {
logger.warn("Ignoring onClose for already closed/closing connection");
return;
}
if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) {
watcher.onClose(new KubernetesClientException("Connection unexpectedly closed"));
return;
}
@Override
public void onResponse(Call call, Response response) throws IOException {
if (!response.isSuccessful()) {
throw OperationSupport.requestFailure(request,
OperationSupport.createStatus(response.code(), response.message()));
}

try {
BufferedSource source = response.body().source();
while (!source.exhausted()) {
String message = source.readUtf8LineStrict();
onMessage(message);
}
} catch (Exception e) {
logger.info("Watch terminated unexpectedly. reason: {}", e.getMessage());
}

// if we get here, the source is exhausted, so, we have lost our "watch".
// we must reconnect.
if (response != null) {
response.body().close();
// if we get here, the source is exhausted, so, we have lost our "watch".
// we must reconnect.
if (response != null) {
response.body().close();
}
scheduleReconnect();
}
scheduleReconnect();
}
});
}

private void scheduleReconnect() {
if (forceClosed.get()) {
logger.warn("Ignoring error for already closed/closing connection");
return;
}

if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) {
watcher.onClose(new KubernetesClientException("Connection unexpectedly closed"));
return;
}

logger.debug("Submitting reconnect task to the executor");
// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor.
Expand Down

0 comments on commit ad62c97

Please sign in to comment.