Skip to content

Commit

Permalink
Merge pull request Azure#247 from Azure/ImmutableOperationStatus
Browse files Browse the repository at this point in the history
Make OperationStatus immutable
  • Loading branch information
Dan Schulte authored Oct 3, 2017
2 parents 95a25b7 + 2351902 commit 5755604
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,16 @@ public static <A> A create(Class<A> swaggerInterface, String baseUrl, final Http
protected Object handleSyncHttpResponse(HttpRequest httpRequest, HttpResponse httpResponse, SwaggerMethodParser methodParser) throws IOException, InterruptedException {
final SerializerAdapter<?> serializer = serializer();

final OperationStatus<Object> operationStatus = new OperationStatus<>(httpRequest, httpResponse, serializer);
while (!operationStatus.isDone()) {
operationStatus.delay();
final PollStrategy pollStrategy = createPollStrategy(httpRequest, httpResponse, serializer);
if (pollStrategy != null) {
while (!pollStrategy.isDone()) {
pollStrategy.delay();

final HttpRequest pollRequest = operationStatus.createPollRequest();
httpResponse = sendHttpRequest(pollRequest);
final HttpRequest pollRequest = pollStrategy.createPollRequest();
httpResponse = sendHttpRequest(pollRequest);

operationStatus.updateFrom(httpResponse);
pollStrategy.updateFrom(httpResponse);
}
}

return super.handleSyncHttpResponse(httpRequest, httpResponse, methodParser);
Expand All @@ -133,19 +135,19 @@ protected Object handleAsyncHttpResponse(final HttpRequest httpRequest, Single<H
.flatMap(new Func1<HttpResponse, Single<? extends HttpResponse>>() {
@Override
public Single<? extends HttpResponse> call(HttpResponse httpResponse) {
final OperationStatus<Object> operationStatus = new OperationStatus<>(httpRequest, httpResponse, serializer);
final PollStrategy pollStrategy = createPollStrategy(httpRequest, httpResponse, serializer);

Single<HttpResponse> result;
if (operationStatus.isDone()) {
if (pollStrategy == null || pollStrategy.isDone()) {
result = Single.just(httpResponse);
}
else {
result = sendPollRequestWithDelay(operationStatus)
result = sendPollRequestWithDelay(pollStrategy)
.repeat()
.takeUntil(new Func1<HttpResponse, Boolean>() {
@Override
public Boolean call(HttpResponse ignored) {
return operationStatus.isDone();
return pollStrategy.isDone();
}
})
.last()
Expand All @@ -169,22 +171,22 @@ else if (returnTypeToken.isSubtypeOf(Observable.class)) {
.flatMap(new Func1<HttpResponse, Observable<OperationStatus<Object>>>() {
@Override
public Observable<OperationStatus<Object>> call(HttpResponse httpResponse) {
final OperationStatus<Object> operationStatus = new OperationStatus<>(httpRequest, httpResponse, serializer);
final PollStrategy pollStrategy = createPollStrategy(httpRequest, httpResponse, serializer);

Observable<OperationStatus<Object>> result;
if (operationStatus.isDone()) {
result = toCompletedOperationStatusObservable(operationStatus, httpRequest, httpResponse, methodParser, operationStatusResultType);
if (pollStrategy == null || pollStrategy.isDone()) {
result = toCompletedOperationStatusObservable(httpRequest, httpResponse, methodParser, operationStatusResultType);
} else {
result = sendPollRequestWithDelay(operationStatus)
result = sendPollRequestWithDelay(pollStrategy)
.flatMap(new Func1<HttpResponse, Observable<OperationStatus<Object>>>() {
@Override
public Observable<OperationStatus<Object>> call(HttpResponse httpResponse) {
Observable<OperationStatus<Object>> result;
if (!operationStatus.isDone()) {
result = Observable.just(operationStatus);
if (!pollStrategy.isDone()) {
result = Observable.just(new OperationStatus<>(pollStrategy));
}
else {
result = toCompletedOperationStatusObservable(operationStatus, httpRequest, httpResponse, methodParser, operationStatusResultType);
result = toCompletedOperationStatusObservable(httpRequest, httpResponse, methodParser, operationStatusResultType);
}
return result;
}
Expand All @@ -206,35 +208,65 @@ public Boolean call(OperationStatus<Object> operationStatus) {
return result;
}

private Observable<OperationStatus<Object>> toCompletedOperationStatusObservable(OperationStatus<Object> operationStatus, HttpRequest httpRequest, HttpResponse httpResponse, SwaggerMethodParser methodParser, Type operationStatusResultType) {
private static PollStrategy createPollStrategy(HttpRequest httpRequest, HttpResponse httpResponse, SerializerAdapter<?> serializer) {
PollStrategy result = null;

final int httpStatusCode = httpResponse.statusCode();
if (httpStatusCode != 200) {
final String fullyQualifiedMethodName = httpRequest.callerMethod();
final String originalHttpRequestMethod = httpRequest.httpMethod();
final String originalHttpRequestUrl = httpRequest.url();

if (originalHttpRequestMethod.equalsIgnoreCase("PUT") || originalHttpRequestMethod.equalsIgnoreCase("PATCH")) {
if (httpStatusCode == 201) {
result = AzureAsyncOperationPollStrategy.tryToCreate(fullyQualifiedMethodName, httpResponse, originalHttpRequestUrl, serializer);
} else if (httpStatusCode == 202) {
result = AzureAsyncOperationPollStrategy.tryToCreate(fullyQualifiedMethodName, httpResponse, originalHttpRequestUrl, serializer);
if (result == null) {
result = LocationPollStrategy.tryToCreate(fullyQualifiedMethodName, httpResponse);
}
}
} else /* if (originalRequestHttpMethod.equalsIgnoreCase("DELETE") || originalRequestHttpMethod.equalsIgnoreCase("POST") */ {
if (httpStatusCode == 202) {
result = AzureAsyncOperationPollStrategy.tryToCreate(fullyQualifiedMethodName, httpResponse, originalHttpRequestUrl, serializer);
if (result == null) {
result = LocationPollStrategy.tryToCreate(fullyQualifiedMethodName, httpResponse);
}
}
}
}

return result;
}

private Observable<OperationStatus<Object>> toCompletedOperationStatusObservable(HttpRequest httpRequest, HttpResponse httpResponse, SwaggerMethodParser methodParser, Type operationStatusResultType) {
Observable<OperationStatus<Object>> result;
try {
final Object resultObject = super.handleSyncHttpResponse(httpRequest, httpResponse, methodParser, operationStatusResultType);
operationStatus.setResult(resultObject);
result = Observable.just(operationStatus);
result = Observable.just(new OperationStatus<>(resultObject));
} catch (IOException e) {
result = Observable.error(e);
}
return result;
}

private Observable<HttpResponse> sendPollRequestWithDelay(final OperationStatus<Object> operationStatus) {
private Observable<HttpResponse> sendPollRequestWithDelay(final PollStrategy pollStrategy) {
return Observable.defer(new Func0<Observable<HttpResponse>>() {
@Override
public Observable<HttpResponse> call() {
return operationStatus
return pollStrategy
.delayAsync()
.flatMap(new Func1<Void, Single<HttpResponse>>() {
@Override
public Single<HttpResponse> call(Void ignored) {
final HttpRequest pollRequest = operationStatus.createPollRequest();
final HttpRequest pollRequest = pollStrategy.createPollRequest();
return sendHttpRequestAsync(pollRequest);
}
})
.flatMap(new Func1<HttpResponse, Single<HttpResponse>>() {
@Override
public Single<HttpResponse> call(HttpResponse response) {
return operationStatus.updateFromAsync(response);
return pollStrategy.updateFromAsync(response);
}
})
.toObservable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,6 @@

package com.microsoft.azure;

import com.microsoft.rest.protocol.SerializerAdapter;
import com.microsoft.rest.http.HttpRequest;
import com.microsoft.rest.http.HttpResponse;
import rx.Single;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* The current state of polling for the result of a long running operation.
* @param <T> The type of value that will be returned from the long running operation.
Expand All @@ -23,102 +15,28 @@ public class OperationStatus<T> {
private T result;

/**
* Create a new OperationStatus from the provided HTTP response.
* @param originalHttpRequest The HttpRequest that initiated the long running operation.
* @param originalHttpResponse The HttpResponse from the request that initiated the long running
* operation.
* @param serializer The serializer used to deserialize the response body.
*/
OperationStatus(HttpRequest originalHttpRequest, HttpResponse originalHttpResponse, SerializerAdapter<?> serializer) {
final int httpStatusCode = originalHttpResponse.statusCode();

if (httpStatusCode != 200) {
final String fullyQualifiedMethodName = originalHttpRequest.callerMethod();
final String originalHttpRequestMethod = originalHttpRequest.httpMethod();
final String originalHttpRequestUrl = originalHttpRequest.url();

if (originalHttpRequestMethod.equalsIgnoreCase("PUT") || originalHttpRequestMethod.equalsIgnoreCase("PATCH")) {
if (httpStatusCode == 201) {
pollStrategy = AzureAsyncOperationPollStrategy.tryToCreate(fullyQualifiedMethodName, originalHttpResponse, originalHttpRequestUrl, serializer);
} else if (httpStatusCode == 202) {
pollStrategy = AzureAsyncOperationPollStrategy.tryToCreate(fullyQualifiedMethodName, originalHttpResponse, originalHttpRequestUrl, serializer);
if (pollStrategy == null) {
pollStrategy = LocationPollStrategy.tryToCreate(fullyQualifiedMethodName, originalHttpResponse);
}
}
} else /* if (originalRequestHttpMethod.equalsIgnoreCase("DELETE") || originalRequestHttpMethod.equalsIgnoreCase("POST") */ {
if (httpStatusCode == 202) {
pollStrategy = AzureAsyncOperationPollStrategy.tryToCreate(fullyQualifiedMethodName, originalHttpResponse, originalHttpRequestUrl, serializer);
if (pollStrategy == null) {
pollStrategy = LocationPollStrategy.tryToCreate(fullyQualifiedMethodName, originalHttpResponse);
}
}
}
}
}

/**
* Update the properties of this OperationStatus from the provided response.
* @param httpResponse The HttpResponse from the most recent request.
* Create a new OperationStatus with the provided PollStrategy.
* @param pollStrategy The polling strategy that the OperationStatus will use to check the
* progress of a long running operation.
*/
void updateFrom(HttpResponse httpResponse) throws IOException {
pollStrategy.updateFrom(httpResponse);
OperationStatus(PollStrategy pollStrategy) {
this.pollStrategy = pollStrategy;
}

/**
* Update the properties of this OperationStatus from the provided HTTP poll response.
* @param httpPollResponse The response of the most recent poll request.
* @return A Single that can be used to chain off of this operation.
* Create a new OperationStatus with the provided result.
* @param result The final result of a long running operation.
*/
Single<HttpResponse> updateFromAsync(HttpResponse httpPollResponse) {
return pollStrategy.updateFromAsync(httpPollResponse);
OperationStatus(T result) {
this.result = result;
}

/**
* Get whether or not the long running operation is done.
* @return Whether or not the long running operation is done.
*/
public boolean isDone() {
return pollStrategy == null || pollStrategy.isDone();
}

/**
* Create a HttpRequest that will get the next polling status update for the long running
* operation.
* @return A HttpRequest that will get the next polling status update for the long running
* operation.
*/
HttpRequest createPollRequest() {
return pollStrategy.createPollRequest();
}

/**
* If this OperationStatus has a retryAfterSeconds value, delay (and block) the current thread for
* the number of seconds that are in the retryAfterSeconds value. If this OperationStatus doesn't
* have a retryAfterSeconds value, then just return.
*/
void delay() throws InterruptedException {
final long delayInMilliseconds = pollStrategy.delayInMilliseconds();
if (delayInMilliseconds > 0) {
Thread.sleep(delayInMilliseconds);
}
}

/**
* If this OperationStatus has a retryAfterSeconds value, return an Single that is delayed by the
* number of seconds that are in the retryAfterSeconds value. If this OperationStatus doesn't have
* a retryAfterSeconds value, then return an Single with no delay.
* @return A Single with delay if this OperationStatus has a retryAfterSeconds value.
*/
Single<Void> delayAsync() {
Single<Void> result = Single.just(null);

final long delayInMilliseconds = pollStrategy.delayInMilliseconds();
if (delayInMilliseconds > 0) {
result = result.delay(delayInMilliseconds, TimeUnit.MILLISECONDS);
}

return result;
return pollStrategy == null;
}

/**
Expand All @@ -129,12 +47,4 @@ Single<Void> delayAsync() {
public T result() {
return result;
}

/**
* Set the result of this OperationStatus.
* @param result The result to assign to this OperationStatus.
*/
void setResult(T result) {
this.result = result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import rx.Single;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* An abstract class for the different strategies that an OperationStatus can use when checking the
Expand Down Expand Up @@ -53,6 +54,33 @@ final void updateDelayInMillisecondsFrom(HttpResponse httpPollResponse) {
}
}

/**
* If this PollStrategy has a retryAfterSeconds value, delay (and block) the current thread for
* the number of seconds that are in the retryAfterSeconds value. If this PollStrategy doesn't
* have a retryAfterSeconds value, then just return.
*/
void delay() throws InterruptedException {
if (delayInMilliseconds > 0) {
Thread.sleep(delayInMilliseconds);
}
}

/**
* If this OperationStatus has a retryAfterSeconds value, return an Single that is delayed by the
* number of seconds that are in the retryAfterSeconds value. If this OperationStatus doesn't have
* a retryAfterSeconds value, then return an Single with no delay.
* @return A Single with delay if this OperationStatus has a retryAfterSeconds value.
*/
Single<Void> delayAsync() {
Single<Void> result = Single.just(null);

if (delayInMilliseconds > 0) {
result = result.delay(delayInMilliseconds, TimeUnit.MILLISECONDS);
}

return result;
}

/**
* Create a new HTTP poll request.
* @return A new HTTP poll request.
Expand Down

0 comments on commit 5755604

Please sign in to comment.