Skip to content

Commit

Permalink
Fix AmqpChannelProcessor to take loggingContext
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Jan 4, 2022
1 parent 0efd69e commit 90fe335
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ public void visitToken(DetailAST token) {
}
}

/**
*
**/
/*
* Checks if the expression includes call to log(), which verifies logging builder call
* e.g. logger.atError().log(ex)
*/
private static boolean findLogMethodIdentifier(DetailAST root) {
for (DetailAST ast = root.getFirstChild(); ast != null; ast = ast.getNextSibling()) {
if (ast.getType() == TokenTypes.METHOD_CALL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;

import static com.azure.core.amqp.implementation.AmqpLoggingUtils.createContextWithConnectionId;
import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.INTERVAL_KEY;

public class AmqpChannelProcessor<T> extends Mono<T> implements Processor<T, T>, CoreSubscriber<T>, Disposable {
Expand Down Expand Up @@ -71,16 +69,11 @@ public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath,
this.errorContext = new AmqpErrorContext(fullyQualifiedNamespace);
}


public AmqpChannelProcessor(String fullyQualifiedNamespace, String entityPath, String connectionId,
Function<T, Flux<AmqpEndpointState>> endpointStatesFunction, AmqpRetryPolicy retryPolicy) {
public AmqpChannelProcessor(String fullyQualifiedNamespace, Function<T, Flux<AmqpEndpointState>> endpointStatesFunction, AmqpRetryPolicy retryPolicy, Map<String, Object> loggingContext) {
this.endpointStatesFunction = Objects.requireNonNull(endpointStatesFunction,
"'endpointStates' cannot be null.");
this.retryPolicy = Objects.requireNonNull(retryPolicy, "'retryPolicy' cannot be null.");

Map<String, Object> loggingContext = createContextWithConnectionId(connectionId);
loggingContext.put(ENTITY_PATH_KEY, Objects.requireNonNull(entityPath, "'entityPath' cannot be null."));
this.logger = new ClientLogger(AmqpChannelProcessor.class, loggingContext);
this.logger = new ClientLogger(AmqpChannelProcessor.class, Objects.requireNonNull(loggingContext, "'loggingContext' cannot be null."));

this.errorContext = new AmqpErrorContext(fullyQualifiedNamespace);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,7 @@ protected AmqpChannelProcessor<RequestResponseChannel> createRequestResponseChan
loggingContext.put(ENTITY_PATH_KEY, entityPath);

return createChannel
.subscribeWith(new AmqpChannelProcessor<>(getFullyQualifiedNamespace(), entityPath, connectionId,
channel -> channel.getEndpointStates(), retryPolicy));
.subscribeWith(new AmqpChannelProcessor<>(getFullyQualifiedNamespace(), channel -> channel.getEndpointStates(), retryPolicy, loggingContext));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import reactor.test.publisher.TestPublisher;

import java.time.Duration;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -55,9 +56,7 @@ class AmqpChannelProcessorTest {
@BeforeEach
void setup() {
mocksCloseable = MockitoAnnotations.openMocks(this);

channelProcessor = new AmqpChannelProcessor<>("namespace-test", "test-path", "connection-test",
TestObject::getStates, retryPolicy);
channelProcessor = new AmqpChannelProcessor<>("namespace-test", TestObject::getStates, retryPolicy, new HashMap<>());
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void setup(TestInfo testInfo) {

final AmqpChannelProcessor<RequestResponseChannel> requestResponseMono =
Mono.defer(() -> Mono.just(requestResponseChannel)).subscribeWith(new AmqpChannelProcessor<>(
"foo", "bar", "baz", RequestResponseChannel::getEndpointStates, retryPolicy));
"foo", RequestResponseChannel::getEndpointStates, retryPolicy, new HashMap<>()));

when(tokenManager.authorize()).thenReturn(Mono.just(1000L));
when(tokenManager.getAuthorizationResults()).thenReturn(tokenProviderResults.flux());
Expand Down

0 comments on commit 90fe335

Please sign in to comment.