Skip to content

Commit

Permalink
Use structured logging in core-amqp (#25671)
Browse files Browse the repository at this point in the history
* Switch to structured logging in azure-core-amqp
  • Loading branch information
lmolkova authored Jan 11, 2022
1 parent ea6e60f commit b1569cd
Show file tree
Hide file tree
Showing 35 changed files with 955 additions and 602 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.puppycrawl.tools.checkstyle.api.DetailAST;
import com.puppycrawl.tools.checkstyle.api.FullIdent;
import com.puppycrawl.tools.checkstyle.api.TokenTypes;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Queue;
Expand All @@ -25,12 +26,15 @@
public class ThrowFromClientLoggerCheck extends AbstractCheck {
private static final String LOGGER_LOG_EXCEPTION_AS_ERROR = "logger.logExceptionAsError";
private static final String LOGGER_LOG_THROWABLE_AS_ERROR = "logger.logThrowableAsError";
private static final String LOGGING_BUILDER_LOG_THROWABLE_AS_ERROR = "logger.atError().log";
private static final String LOGGER_LOG_EXCEPTION_AS_WARNING = "logger.logExceptionAsWarning";
private static final String LOGGER_LOG_THROWABLE_AS_WARNING = "logger.logThrowableAsWarning";
private static final String LOGGER_LOG_THROWABLE_AS_WARNING = "logger.logThrowableAsWarning";
private static final String LOGGING_BUILDER_LOG_THROWABLE_AS_WARNING = "logger.atWarning().log";

static final String THROW_LOGGER_EXCEPTION_MESSAGE = String.format("Directly throwing an exception is disallowed. "
+ "Must throw through \"ClientLogger\" API, either of \"%s\", \"%s\", \"%s\", or \"%s\" where \"logger\" is "
+ "Must throw through \"ClientLogger\" API, either of \"%s\", \"%s\", \"%s\", \"%s\", \"%s\", or \"%s\" where \"logger\" is "
+ "type of ClientLogger from Azure Core package.", LOGGER_LOG_EXCEPTION_AS_ERROR,
LOGGER_LOG_THROWABLE_AS_ERROR, LOGGER_LOG_EXCEPTION_AS_WARNING, LOGGER_LOG_THROWABLE_AS_WARNING);
LOGGER_LOG_THROWABLE_AS_ERROR, LOGGING_BUILDER_LOG_THROWABLE_AS_ERROR, LOGGER_LOG_EXCEPTION_AS_WARNING, LOGGER_LOG_THROWABLE_AS_WARNING, LOGGING_BUILDER_LOG_THROWABLE_AS_WARNING);

// A LIFO queue stores the static status of class, skip this ThrowFromClientLoggerCheck if the class is static
private final Queue<Boolean> classStaticDeque = Collections.asLifoQueue(new ArrayDeque<>());
Expand Down Expand Up @@ -94,9 +98,11 @@ public void visitToken(DetailAST token) {
case TokenTypes.LITERAL_THROW:
// Skip check if the throw exception from static class, constructor or static method
if (classStaticDeque.isEmpty() || classStaticDeque.peek() || isInConstructor
|| methodStaticDeque.isEmpty() || methodStaticDeque.peek()) {
|| methodStaticDeque.isEmpty() || methodStaticDeque.peek()
|| findLogMethodIdentifier(token)) {
return;
}

DetailAST methodCallToken =
token.findFirstToken(TokenTypes.EXPR).findFirstToken(TokenTypes.METHOD_CALL);
if (methodCallToken == null) {
Expand All @@ -118,4 +124,27 @@ public void visitToken(DetailAST token) {
break;
}
}

/*
* Checks if the expression includes call to log(), which verifies logging builder call
* e.g. logger.atError().log(ex)
*/
private static boolean findLogMethodIdentifier(DetailAST root) {
for (DetailAST ast = root.getFirstChild(); ast != null; ast = ast.getNextSibling()) {
if (ast.getType() == TokenTypes.METHOD_CALL) {
DetailAST dot = ast.findFirstToken(TokenTypes.DOT);
if (dot != null) {
DetailAST ident = dot.findFirstToken(TokenTypes.IDENT);
if ("log".equals(ident.getText())) {
return true;
}
}
}
if (findLogMethodIdentifier(ast)) {
return true;
}
}

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.junit.Before;
import org.junit.Test;


import static com.azure.tools.checkstyle.checks.ThrowFromClientLoggerCheck.THROW_LOGGER_EXCEPTION_MESSAGE;

public class ThrowFromClientLoggerCheckTest extends AbstractModuleTestSupport {
Expand All @@ -32,7 +33,8 @@ protected String getPackageLocation() {
@Test
public void directThrowExceptionTestData() throws Exception {
String[] expected = {
expectedErrorMessage(12, 9)
expectedErrorMessage(12, 9),
expectedErrorMessage(60, 9)
};
verify(checker, getPath("DirectThrowExceptionTestData.java"), expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,21 @@ public void validLogExceptionAsWarning() {
public void validLogThrowableAsWarning() {
throw logger.logThrowableAsWarning(new RuntimeException("Error message."));
}

public void validThrowExceptionWithBuilder() {
throw logger.atError().log(Exceptions.propagate(new IllegalStateException("Error Messages")));
}

public void validThrowExceptionWithBuilderAndContext() {
throw logger.atError().addKeyValuePair("foo", "bar").log(new RuntimeException("Error message."));
}

public void validThrowExceptionWithBuilderAndContextAdvanced() {
LoggingEventBuilder builder = logger.atError();
throw builder.addKeyValuePair("foo", "bar").log(new RuntimeException("Error message."));
}

public void invalidLoggingBuilderNoLogCall() {
throw logger.atError();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.azure.core.amqp;

import java.util.Locale;
import com.azure.core.util.logging.LoggingEventBuilder;

/**
* Represents a signal that caused the AMQP connection to shutdown.
Expand Down Expand Up @@ -47,11 +47,12 @@ public boolean isInitiatedByClient() {
}

/**
* {@inheritDoc}
* Returns String representing the message of this {@code AmqpShutdownSignal} signal.
*
* <strong>To write logs, please use {@link com.azure.core.amqp.implementation.AmqpLoggingUtils#addShutdownSignal(LoggingEventBuilder, AmqpShutdownSignal)}.</strong>
*/
@Override
public String toString() {
return String.format(Locale.US, "%s, isTransient[%s], initiatedByClient[%s]", message, isTransient,
isInitiatedByClient);
return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static com.azure.core.amqp.implementation.AmqpLoggingUtils.addSignalTypeAndResult;
import static com.azure.core.amqp.implementation.ClientConstants.INTERVAL_KEY;

/**
* Manages the re-authorization of the client to the token audience against the CBS node.
*/
Expand Down Expand Up @@ -76,12 +79,14 @@ public Mono<Long> authorize() {

// If this is the first time authorize is called, the task will not have been scheduled yet.
if (!hasScheduled.getAndSet(true)) {
logger.info("Scheduling refresh token task. scopes[{}]", scopes);
logger.atInfo()
.addKeyValue("scopes", scopes)
.log("Scheduling refresh token task.");

final Duration firstInterval = Duration.ofMillis(refreshIntervalMS);
lastRefreshInterval.set(firstInterval);
authorizationResults.emitNext(AmqpResponseCode.ACCEPTED, (signalType, emitResult) -> {
logger.verbose("signalType[{}] result[{}] Could not emit ACCEPTED.", signalType, emitResult);
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult).log("Could not emit ACCEPTED.");
return false;
});

Expand All @@ -99,11 +104,13 @@ public void close() {
}

authorizationResults.emitComplete((signalType, emitResult) -> {
logger.verbose("signalType[{}] result[{}] Could not close authorizationResults.", signalType, emitResult);
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult).log("Could not close authorizationResults.");

return false;
});
durationSource.emitComplete((signalType, emitResult) -> {
logger.verbose("signalType[{}] result[{}] Could not close durationSource.", signalType, emitResult);
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult).log("Could not close durationSource.");

return false;
});

Expand All @@ -115,70 +122,85 @@ public void close() {
private Disposable scheduleRefreshTokenTask(Duration initialRefresh) {
// EmitterProcessor can queue up an initial refresh interval before any subscribers are received.
durationSource.emitNext(initialRefresh, (signalType, emitResult) -> {
logger.verbose("signalType[{}] result[{}] Could not emit initial refresh interval.", signalType,
emitResult);
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult).log("Could not emit initial refresh interval.");

return false;
});

return Flux.switchOnNext(durationSource.asFlux().map(Flux::interval))
.takeUntil(duration -> hasDisposed.get())
.flatMap(delay -> {
logger.info("Refreshing token. scopes[{}] ", scopes);

logger.atInfo()
.addKeyValue("scopes", scopes)
.log("Refreshing token.");

return authorize();
})
.onErrorContinue(
error -> (error instanceof AmqpException) && ((AmqpException) error).isTransient(),
(amqpException, interval) -> {
final Duration lastRefresh = lastRefreshInterval.get();

logger.error("Error is transient. Rescheduling authorization task at interval {} ms. scopes[{}]",
lastRefresh.toMillis(), scopes, amqpException);
logger.atError()
.addKeyValue("scopes", scopes)
.addKeyValue(INTERVAL_KEY, interval)
.log("Error is transient. Rescheduling authorization task.", amqpException);

durationSource.emitNext(lastRefresh, (signalType, emitResult) -> {
logger.verbose("signalType[{}] result[{}] Could not emit lastRefresh[{}].", signalType,
emitResult, lastRefresh);
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult)
.addKeyValue("lastRefresh", lastRefresh)
.log("Could not emit lastRefresh.");

return false;
});
})
.subscribe(interval -> {
logger.verbose("Authorization successful. Refreshing token in {} ms. scopes[{}]", interval, scopes);
logger.atVerbose()
.addKeyValue("scopes", scopes)
.addKeyValue(INTERVAL_KEY, interval)
.log("Authorization successful. Refreshing token.");

authorizationResults.emitNext(AmqpResponseCode.ACCEPTED, (signalType, emitResult) -> {
logger.verbose("signalType[{}] result[{}] Could not emit ACCEPTED after refresh.", signalType,
emitResult);
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult)
.log("Could not emit ACCEPTED after refresh.");
return false;
});

final Duration nextRefresh = Duration.ofMillis(interval);
lastRefreshInterval.set(nextRefresh);

durationSource.emitNext(nextRefresh, (signalType, emitResult) -> {
logger.verbose("signalType[{}] result[{}] Could not emit nextRefresh[{}].", signalType,
emitResult, nextRefresh);
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult)
.addKeyValue("nextRefresh", nextRefresh)
.log("Could not emit nextRefresh.");

return false;
});
}, error -> {
logger.error("Error occurred while refreshing token that is not retriable. Not scheduling"
+ " refresh task. Use ActiveClientTokenManager.authorize() to schedule task again. audience[{}]"
+ " scopes[{}]", tokenAudience, scopes, error);

// This hasn't been disposed yet.
if (!hasDisposed.getAndSet(true)) {
hasScheduled.set(false);
durationSource.emitComplete((signalType, emitResult) -> {
logger.verbose("signalType[{}] result[{}] Could not close durationSource.", signalType,
emitResult);

return false;
});

authorizationResults.emitError(error, (signalType, emitResult) -> {
logger.verbose("signalType[{}] result[{}] Could not emit authorization error.", signalType,
emitResult, error);

return false;
});
}
});
logger.atError()
.addKeyValue("scopes", scopes)
.addKeyValue("audience", tokenAudience)
.log("Error occurred while refreshing token that is not retriable. Not scheduling"
+ " refresh task. Use ActiveClientTokenManager.authorize() to schedule task again.", error);

// This hasn't been disposed yet.
if (!hasDisposed.getAndSet(true)) {
hasScheduled.set(false);
durationSource.emitComplete((signalType, emitResult) -> {
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult)
.log("Could not close durationSource.");

return false;
});

authorizationResults.emitError(error, (signalType, emitResult) -> {
addSignalTypeAndResult(logger.atVerbose(), signalType, emitResult)
.log("Could not emit authorization error.", error);

return false;
});
}
});
}
}
Loading

0 comments on commit b1569cd

Please sign in to comment.