Skip to content

Commit

Permalink
Fix logging context propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Apr 4, 2024
1 parent 45fc2d0 commit b8f4bd3
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 59 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ status = warn
appender.console.type = Console
appender.console.name = consoleLogger
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %notEmpty{[%X{restateServiceMethod}]}%notEmpty{[%X{restateInvocationId}]} %c - %m%n
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %notEmpty{[%X{restateInvocationTarget}]}%notEmpty{[%X{restateInvocationId}]} %c - %m%n
# Filter out logging during replay
appender.console.filter.replay.type = ContextMapFilter
Expand All @@ -367,10 +367,12 @@ rootLogger.appenderRef.stdout.ref = consoleLogger

The SDK injects the following additional metadata to the logging context that can be used for filtering as well:

* `restateServiceMethod`: service and method, e.g. `counter.Counter/Add`.
* `restateInvocationTarget`: invocation target, e.g. `counter.Counter/Add`.
* `restateInvocationId`: Invocation identifier, to be used in Restate observability tools. See https://docs.restate.dev/operate/invocation#invocation-identifier.
* `restateInvocationStatus`: Invocation status, can be `WAITING_START`, `REPLAYING`, `PROCESSING`, `CLOSED`.

When assembling fat-jars, make sure to enable merging META-INF/services files. For more info, see https://github.com/apache/logging-log4j2/issues/2099.

#### Tracing with OpenTelemetry

The SDK can generate additional tracing information on top of what Restate already publishes. See https://docs.restate.dev/operate/monitoring/tracing to configure Restate tracing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public Long get(ObjectContext ctx) {

@Handler
public CounterUpdateResult getAndAdd(ObjectContext ctx, Long request) {
LOG.info("Invoked get and add with " + request);
LOG.info("Invoked get and add with {}", request);

long currentValue = ctx.get(TOTAL).orElse(0L);
long newValue = currentValue + request;
Expand Down
4 changes: 4 additions & 0 deletions examples/src/main/kotlin/my/restate/sdk/examples/CounterKt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder
import dev.restate.sdk.kotlin.KtSerdes
import dev.restate.sdk.kotlin.ObjectContext
import kotlinx.serialization.Serializable
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger

@Serializable data class CounterUpdate(var oldValue: Long, val newValue: Long)

Expand All @@ -23,6 +25,7 @@ class CounterKt {

companion object {
private val TOTAL = StateKey.of<Long>("total", KtSerdes.json())
private val LOG: Logger = LogManager.getLogger(CounterKt::class.java)
}

@Handler
Expand All @@ -44,6 +47,7 @@ class CounterKt {

@Handler
suspend fun getAndAdd(ctx: ObjectContext, value: Long): CounterUpdate {
LOG.info("Invoked get and add with $value")
val currentValue = ctx.get(TOTAL) ?: 0L
val newValue = currentValue + value
ctx.set(TOTAL, newValue)
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/resources/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ status = warn
appender.console.type = Console
appender.console.name = consoleLogger
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %notEmpty{[%X{restateServiceMethod}]}%notEmpty{[%X{restateInvocationId}]} %c - %m%n
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %notEmpty{[%X{restateInvocationTarget}]}%notEmpty{[%X{restateInvocationId}]} %c - %m%n

# Filter out logging during replay
appender.console.filter.replay.type = ContextMapFilter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import dev.restate.sdk.common.syscalls.*
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asContextElement
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager

Expand Down Expand Up @@ -122,7 +123,10 @@ private constructor(
) {
val ctx: Context = ContextImpl(syscalls)

val scope = CoroutineScope(options.coroutineContext)
val scope =
CoroutineScope(
options.coroutineContext +
InvocationHandler.SYSCALLS_THREAD_LOCAL.asContextElement(syscalls))
scope.launch {
val serializedResult: ByteString

Expand Down
14 changes: 13 additions & 1 deletion sdk-api/src/main/java/dev/restate/sdk/Component.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,19 @@ public HandlerDefinition<Component.Options> toHandlerDefinition() {
@Override
public void handle(
Syscalls syscalls, Component.Options options, SyscallCallback<ByteString> callback) {
options.executor.execute(
// Wrap the executor for setting/unsetting the thread local
Executor wrapped =
runnable ->
options.executor.execute(
() -> {
SYSCALLS_THREAD_LOCAL.set(syscalls);
try {
runnable.run();
} finally {
SYSCALLS_THREAD_LOCAL.remove();
}
});
wrapped.execute(
() -> {
// Any context switching, if necessary, will be done by ResolvedEndpointHandler
Context ctx = new ContextImpl(syscalls);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
import com.google.protobuf.ByteString;

public interface InvocationHandler<O> {
/**
* Thread local to store {@link Syscalls}.
*
* <p>Implementations of {@link InvocationHandler} should correctly propagate this thread local in
* order for logging to work correctly. Could be improved if ScopedContext <a
* href="https://github.com/apache/logging-log4j2/pull/2438">will ever be introduced in
* log4j2</a>.
*/
ThreadLocal<Syscalls> SYSCALLS_THREAD_LOCAL = new ThreadLocal<>();

void handle(Syscalls syscalls, O options, SyscallCallback<ByteString> callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class InvocationStateMachine implements InvocationFlow.InvocationProcessor {
private final String componentName;
private final String fullyQualifiedHandlerName;
private final Span span;
private final Consumer<InvocationState> transitionStateObserver;
private final RestateEndpoint.LoggingContextSetter loggingContextSetter;

private volatile InvocationState invocationState = InvocationState.WAITING_START;

Expand Down Expand Up @@ -64,11 +64,11 @@ class InvocationStateMachine implements InvocationFlow.InvocationProcessor {
String componentName,
String fullyQualifiedHandlerName,
Span span,
Consumer<InvocationState> transitionStateObserver) {
RestateEndpoint.LoggingContextSetter loggingContextSetter) {
this.componentName = componentName;
this.fullyQualifiedHandlerName = fullyQualifiedHandlerName;
this.span = span;
this.transitionStateObserver = transitionStateObserver;
this.loggingContextSetter = loggingContextSetter;

this.incomingEntriesStateMachine = new IncomingEntriesStateMachine();
this.readyResultStateMachine = new ReadyResultStateMachine();
Expand Down Expand Up @@ -190,6 +190,9 @@ void onStartMessage(MessageLite msg) {
Protocol.StartMessage.StateEntry::getKey,
Protocol.StartMessage.StateEntry::getValue)));

// Tracing and logging setup
this.loggingContextSetter.set(
RestateEndpoint.LoggingContextSetter.INVOCATION_ID_KEY, startMessage.getDebugId());
if (this.span.isRecording()) {
span.addEvent(
"Start", Attributes.of(Tracing.RESTATE_INVOCATION_ID, startMessage.getDebugId()));
Expand Down Expand Up @@ -687,7 +690,8 @@ private void transitionState(InvocationState newInvocationState) {
}
LOG.debug("Transitioning {} to {}", this, newInvocationState);
this.invocationState = newInvocationState;
this.transitionStateObserver.accept(newInvocationState);
this.loggingContextSetter.set(
RestateEndpoint.LoggingContextSetter.INVOCATION_STATUS_KEY, newInvocationState.toString());
}

private void incrementCurrentIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ public void start() {
stateMachine.startAndConsumeInput(
SyscallCallback.of(
request -> {
// Set invocation id in logging context
loggingContextSetter.setInvocationId(request.invocationId().toString());

// Prepare Syscalls object
SyscallsInternal syscalls =
this.syscallsExecutor != null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.core;

import dev.restate.sdk.common.syscalls.InvocationHandler;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.core.util.ContextDataProvider;

/**
* Log4j2 {@link ContextDataProvider} inferring context from {@link
* InvocationHandler#SYSCALLS_THREAD_LOCAL}.
*
* <p>This is used to propagate the context to the user code, such that log statements from the user
* will contain the restate logging context variables.
*/
public class RestateContextDataProvider implements ContextDataProvider {
@Override
public Map<String, String> supplyContextData() {
SyscallsInternal syscalls = (SyscallsInternal) InvocationHandler.SYSCALLS_THREAD_LOCAL.get();
if (syscalls == null) {
return Collections.emptyMap();
}

// We can't use the immutable MapN implementation from Map.of because of
// https://github.com/apache/logging-log4j2/issues/2098
HashMap<String, String> m = new HashMap<>(3);
m.put(
RestateEndpoint.LoggingContextSetter.INVOCATION_ID_KEY,
syscalls.request().invocationId().toString());
m.put(
RestateEndpoint.LoggingContextSetter.INVOCATION_TARGET_KEY,
syscalls.getFullyQualifiedMethodName());
m.put(
RestateEndpoint.LoggingContextSetter.INVOCATION_STATUS_KEY,
syscalls.getInvocationState().toString());
return m;
}
}
35 changes: 7 additions & 28 deletions sdk-core/src/main/java/dev/restate/sdk/core/RestateEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,13 @@ public ResolvedEndpointHandler resolve(
.startSpan();

// Setup logging context
loggingContextSetter.setServiceMethod(fullyQualifiedServiceMethod);
loggingContextSetter.set(
LoggingContextSetter.INVOCATION_TARGET_KEY, fullyQualifiedServiceMethod);

// Instantiate state machine, syscall and grpc bridge
InvocationStateMachine stateMachine =
new InvocationStateMachine(
componentName,
fullyQualifiedServiceMethod,
span,
s -> loggingContextSetter.setInvocationStatus(s.toString()));
componentName, fullyQualifiedServiceMethod, span, loggingContextSetter);

return new ResolvedEndpointHandlerImpl(
stateMachine, loggingContextSetter, handler.getHandler(), svc.options, syscallExecutor);
Expand Down Expand Up @@ -151,35 +149,16 @@ public RestateEndpoint build() {
* LoggingContextSetter#THREAD_LOCAL_INSTANCE}, though the caller of {@link RestateEndpoint} must
* take care of the cleanup of the thread local map.
*/
@FunctionalInterface
public interface LoggingContextSetter {

String INVOCATION_ID_KEY = "restateInvocationId";
String COMPONENT_HANDLER_KEY = "restateComponentHandler";
String INVOCATION_TARGET_KEY = "restateInvocationTarget";
String INVOCATION_STATUS_KEY = "restateInvocationStatus";

LoggingContextSetter THREAD_LOCAL_INSTANCE =
new LoggingContextSetter() {
@Override
public void setServiceMethod(String serviceMethod) {
ThreadContext.put(COMPONENT_HANDLER_KEY, serviceMethod);
}
LoggingContextSetter THREAD_LOCAL_INSTANCE = ThreadContext::put;

@Override
public void setInvocationId(String id) {
ThreadContext.put(INVOCATION_ID_KEY, id);
}

@Override
public void setInvocationStatus(String invocationStatus) {
ThreadContext.put(INVOCATION_STATUS_KEY, invocationStatus);
}
};

void setServiceMethod(String serviceMethod);

void setInvocationId(String id);

void setInvocationStatus(String invocationStatus);
void set(String key, String value);
}

private static class ComponentAdapterSingleton {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dev.restate.sdk.core.RestateContextDataProvider
Original file line number Diff line number Diff line change
Expand Up @@ -113,24 +113,7 @@ public void handle(HttpServerRequest request) {
componentName,
handlerName,
otelContext,
new RestateEndpoint.LoggingContextSetter() {
@Override
public void setServiceMethod(String serviceMethod) {
ContextualData.put(
RestateEndpoint.LoggingContextSetter.COMPONENT_HANDLER_KEY, serviceMethod);
}

@Override
public void setInvocationId(String id) {
ContextualData.put(RestateEndpoint.LoggingContextSetter.INVOCATION_ID_KEY, id);
}

@Override
public void setInvocationStatus(String invocationStatus) {
ContextualData.put(
RestateEndpoint.LoggingContextSetter.INVOCATION_STATUS_KEY, invocationStatus);
}
},
ContextualData::put,
currentContextExecutor(vertxCurrentContext));
} catch (ProtocolException e) {
LOG.warn("Error when resolving the handler", e);
Expand Down

0 comments on commit b8f4bd3

Please sign in to comment.