Skip to content
This repository has been archived by the owner on May 11, 2021. It is now read-only.

Commit

Permalink
[#13] Propagating correlation ID when retrying
Browse files Browse the repository at this point in the history
  • Loading branch information
nurkiewicz committed Dec 4, 2014
1 parent 3e6e48d commit 6234752
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.springframework.web.client.RestOperations
* </pre>
*
* in the following manner (example for POST):
*
*
* <pre>
* serviceRestClient.forService('users').post()
* .onUrl('/some/url/to/service')
* .body('<loan><id>100</id><name>Smith</name></loan>')
Expand All @@ -36,16 +37,42 @@ import org.springframework.web.client.RestOperations
* .andExecuteFor()
* .aResponseEntity()
* .ofType(String)
*
* </pre>
*
* If you want to send a request to the outside world you can also profit from this component as follows (example for google.com):
*
*<pre>
* serviceRestClient.forExternalService().get()
* .onUrl('http://google.com')
* .andExecuteFor()
* .aResponseEntity()
* .ofType(String)
*
* </pre>
*
* This client has built in retrying mechanism supported:
*
* <pre>
* @@Autowired
* AsyncRetryExecutor executor
*
* serviceRestClient
* .forExternalService()
* .retryUsing(
* executor
* .withMaxRetries(5)
* .withFixedBackoff(2_000)
* .withUniformJitter())
* .delete()
* .onUrl(SOME_SERVICE_URL)
* .ignoringResponseAsync()
* </pre>
*
* If you are using retry mechanism, another features is enabled - asynchronous invocation. By appending <code>Async</code>
* to last method you will get <code>ListenableFuture</code> instance. This way you can easily run multiple requests
* concurrently, combine them, etc.
*
* @see <a href="https://github.com/4finance/micro-deps">micro-deps project</a>
* @see <a href="">async-retry</a>
*/
@CompileStatic
class ServiceRestClient {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package com.ofg.infrastructure.web.resttemplate.fluent

import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.nurkiewicz.asyncretry.AsyncRetryExecutor
import com.ofg.infrastructure.discovery.ServiceConfigurationResolver
import com.ofg.infrastructure.discovery.ServiceResolver
import com.ofg.infrastructure.web.resttemplate.custom.RestTemplate
import groovy.transform.CompileStatic
import org.springframework.beans.BeansException
import org.springframework.beans.factory.annotation.Value
import org.springframework.beans.factory.config.BeanFactoryPostProcessor
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.Ordered
import org.springframework.web.client.RestOperations

import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ThreadFactory

/**
* Creates a bean of abstraction over {@link RestOperations}.
*
Expand All @@ -32,6 +39,21 @@ class ServiceRestClientConfiguration {
return new RestTemplate()
}

@Bean
AsyncRetryExecutor retryExecutor(@Value('${retry.threads:10}') int retryPoolThreads) {
return new AsyncRetryExecutor(retryExecutorService(retryPoolThreads))
}

ScheduledExecutorService retryExecutorService(@Value('${retry.threads:10}') int retryPoolThreads) {
return Executors.newScheduledThreadPool(retryPoolThreads, retryThreadFactory())
}

ThreadFactory retryThreadFactory() {
new ThreadFactoryBuilder()
.setNameFormat(AsyncRetryExecutor.simpleName + "-%d")
.build()
}

@Bean
static RestTemplateAutowireCandidateFalsePostProcessor disableMicroInfraSpringRestTemplateAutowiring() {
return new RestTemplateAutowireCandidateFalsePostProcessor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ abstract class ResponseTypeRelatedRequestsExecutor<T> {
protected abstract SpringHttpMethod getHttpMethod()

ResponseEntity<T> exchange() {
return exchangeAsync().get()
return restExecutor.exchange(httpMethod, params, responseType)
}

ListenableFuture<ResponseEntity<T>> exchangeAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package com.ofg.infrastructure.web.resttemplate.fluent.common.response.executor

import com.google.common.util.concurrent.ListenableFuture
import com.nurkiewicz.asyncretry.RetryExecutor
import groovy.transform.CompileStatic
import com.nurkiewicz.asyncretry.SyncRetryExecutor
import com.ofg.infrastructure.correlationid.CorrelationIdHolder
import com.ofg.infrastructure.correlationid.CorrelationIdUpdater
import groovy.transform.TypeChecked
import org.springframework.http.HttpEntity
import org.springframework.http.HttpHeaders
Expand All @@ -26,22 +28,32 @@ final class RestExecutor<T> {
}

ResponseEntity<T> exchange(HttpMethod httpMethod, Map params, Class<T> responseType) {
return exchangeAsync(httpMethod, params, responseType).get()
return exchangeInternal(params, httpMethod, responseType).get()
}

ListenableFuture<ResponseEntity<T>> exchangeAsync(HttpMethod httpMethod, Map params, Class<T> responseType) {
throwIfAsyncWithoutExecutor()
return exchangeInternal(params, httpMethod, responseType)
}

private ListenableFuture<ResponseEntity<T>> exchangeInternal(Map params, HttpMethod httpMethod, Class<T> responseType) {
if (params.url) {
return callUrlWithRetry(httpMethod, params, responseType)
return urlExchange(httpMethod, params, responseType)
} else if (params.urlTemplate) {
return callUrlTemplateWithRetry(httpMethod, params, responseType)
return urlTemplateExchange(httpMethod, params, responseType)
}
throw new InvalidHttpMethodParametersException(params)
}

protected ListenableFuture<ResponseEntity<T>> callUrlTemplateWithRetry(HttpMethod httpMethod, Map params, Class<T> responseType) {
return retryExecutor.getWithRetry {
//TODO Correlation ID
return restOperations.exchange(
private void throwIfAsyncWithoutExecutor() {
if (retryExecutor == SyncRetryExecutor.INSTANCE)
throw new IllegalStateException("Async execution is only enabled with retrying executor. Try .retryUsing(executor.dontRetry()) ")
}


private ListenableFuture<ResponseEntity<T>> urlTemplateExchange(HttpMethod httpMethod, Map params, Class<T> responseType) {
return withRetry {
restOperations.exchange(
appendPathToHost(params.host as String, params.urlTemplate as String),
httpMethod,
getHttpEntityFrom(params),
Expand All @@ -50,17 +62,26 @@ final class RestExecutor<T> {
}
}

protected ListenableFuture<ResponseEntity<T>> callUrlWithRetry(HttpMethod httpMethod, Map params, Class<T> responseType) {
return retryExecutor.getWithRetry {
//TODO Correlation ID
restOperations.exchange(

private ListenableFuture<ResponseEntity<T>> urlExchange(HttpMethod httpMethod, Map params, Class<T> responseType) {
return withRetry {
return restOperations.exchange(
new URI(appendPathToHost(params.host as String, params.url as URI)),
httpMethod,
getHttpEntityFrom(params),
responseType)
}
}

private ListenableFuture<ResponseEntity<T>> withRetry(Closure<ResponseEntity<T>> httpInvocation) {
String correlationId = CorrelationIdHolder.get()
return retryExecutor.getWithRetry {
return CorrelationIdUpdater.withId(correlationId) {
return httpInvocation.call()
}
}
}

static HttpEntity<Object> getHttpEntityFrom(Map params) {
if (params.httpEntity) {
return params.httpEntity as HttpEntity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import com.google.common.util.concurrent.ListenableFuture
* Interface that defines what is the type of the received response.
* It will return an object of provided class.
*/
abstract class ObjectReceiving {
interface ObjectReceiving {

public <T> T ofType(Class<T> responseType) {
return ofTypeAsync(responseType).get()
}
public <T> T ofType(Class<T> responseType)

public abstract <T> ListenableFuture<T> ofTypeAsync(Class<T> responseType)
public <T> ListenableFuture<T> ofTypeAsync(Class<T> responseType)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import org.springframework.http.ResponseEntity
* Interface that defines what is the type of the received response.
* It will return a {@link ResponseEntity} of the provided class.
*/
abstract class ResponseEntityReceiving {
public abstract <T> ListenableFuture<ResponseEntity<T>> ofTypeAsync(Class<T> responseType)
public <T> ResponseEntity<T> ofType(Class<T> responseType) {
return ofTypeAsync(responseType).get()
}
interface ResponseEntityReceiving {
public <T> ListenableFuture<ResponseEntity<T>> ofTypeAsync(Class<T> responseType)
public <T> ResponseEntity<T> ofType(Class<T> responseType)
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ class GetMethodBuilder implements GetMethod, UrlParameterizableGetMethod, Respon
@Override
ObjectReceiving anObject() {
return new ObjectReceiving() {
@Override
def <T> T ofType(Class<T> responseType) {
return get(responseType).exchange()?.body
}

@Override
public <T> ListenableFuture<T> ofTypeAsync(Class<T> responseType) {
GetExecuteForResponseTypeRelated<T> get = get(responseType)
Expand All @@ -98,6 +103,11 @@ class GetMethodBuilder implements GetMethod, UrlParameterizableGetMethod, Respon
public <T> ListenableFuture<ResponseEntity<T>> ofTypeAsync(Class<T> responseType) {
return get(responseType).exchangeAsync()
}

@Override
def <T> ResponseEntity<T> ofType(Class<T> responseType) {
return get(responseType).exchange()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ class OptionsMethodBuilder implements
@Override
ObjectReceiving anObject() {
return new ObjectReceiving() {
@Override
def <T> T ofType(Class<T> responseType) {
return options(responseType).exchange()?.body
}

@Override
public <T> ListenableFuture<T> ofTypeAsync(Class<T> responseType) {
def future = options(responseType).exchangeAsync()
Expand All @@ -104,6 +109,11 @@ class OptionsMethodBuilder implements
public <T> ListenableFuture<ResponseEntity<T>> ofTypeAsync(Class<T> responseType) {
return options(responseType).exchangeAsync()
}

@Override
def <T> ResponseEntity<T> ofType(Class<T> responseType) {
return options(responseType).exchange()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ class PostMethodBuilder extends LocationFindingExecutor implements
@Override
ObjectReceiving anObject() {
return new ObjectReceiving() {
@Override
def <T> T ofType(Class<T> responseType) {
return post(responseType).exchange()?.body
}

@Override
public <T> ListenableFuture<T> ofTypeAsync(Class<T> responseType) {
ListenableFuture<ResponseEntity<T>> future = post(responseType).exchangeAsync()
Expand All @@ -114,6 +119,11 @@ class PostMethodBuilder extends LocationFindingExecutor implements
public <T> ListenableFuture<ResponseEntity<T>> ofTypeAsync(Class<T> responseType) {
return post(responseType).exchangeAsync()
}

@Override
def <T> ResponseEntity<T> ofType(Class<T> responseType) {
return post(responseType).exchange()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ class PutMethodBuilder extends LocationFindingExecutor implements
@Override
ObjectReceiving anObject() {
return new ObjectReceiving() {
@Override
def <T> T ofType(Class<T> responseType) {
return put(responseType).exchange()?.body
}

@Override
public <T> ListenableFuture<T> ofTypeAsync(Class<T> responseType) {
ListenableFuture<ResponseEntity> future = put(responseType).exchangeAsync()
Expand All @@ -115,6 +120,11 @@ class PutMethodBuilder extends LocationFindingExecutor implements
public <T> ListenableFuture<ResponseEntity<T>> ofTypeAsync(Class<T> responseType) {
return put(responseType).exchangeAsync()
}

@Override
def <T> ResponseEntity<T> ofType(Class<T> responseType) {
return put(responseType).exchange()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import static org.springframework.http.HttpMethod.DELETE
import static org.springframework.http.HttpMethod.GET
import static org.springframework.http.HttpMethod.HEAD
import static org.springframework.http.HttpMethod.POST
import static org.springframework.http.HttpMethod.PUT

class ServiceRestClientSpec extends Specification {

Expand Down Expand Up @@ -180,4 +181,29 @@ class ServiceRestClientSpec extends Specification {
throw new RestClientException("Simulated")
} >> null
}

def 'should fail when multiple retries of PUT failed'() {
given:
AsyncRetryExecutor executor = new AsyncRetryExecutor(pool)
when:
serviceRestClient
.forExternalService()
.retryUsing(executor.withMaxRetries(2).withNoDelay())
.put()
.onUrl(SOME_SERVICE_URL)
.body('')
.ignoringResponseAsync()
.get()
then:
3 * restOperations.exchange(_, PUT, _ as HttpEntity, _ as Class) >>> [] >> {
throw new RestClientException("Simulated A")
} >> {
throw new RestClientException("Simulated B")
} >> {
throw new RestClientException("Simulated C")
} >> null
Exception e = thrown(Exception)
e.message.contains("Simulated C")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.ofg.infrastructure.web.resttemplate.fluent.common.response.executor

import com.nurkiewicz.asyncretry.SyncRetryExecutor
import com.ofg.infrastructure.web.resttemplate.custom.RestTemplate
import org.springframework.http.HttpMethod
import spock.lang.Specification

class RestExecutorTest extends Specification {

def "should fail to run asynchronously if retry mechanism wasn't set up"() {
given:
RestExecutor executor = new RestExecutor(
new RestTemplate(), SyncRetryExecutor.INSTANCE)
when:
executor.exchangeAsync(HttpMethod.PUT, [:], Object)

then:
IllegalStateException e = thrown(IllegalStateException)
e.message.contains("retryUsing")
}

}

0 comments on commit 6234752

Please sign in to comment.