Skip to content

Commit

Permalink
WIP: ExecutorService instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
felixbarny committed Jan 11, 2019
1 parent 58084b0 commit 51df3c4
Show file tree
Hide file tree
Showing 16 changed files with 403 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static net.bytebuddy.matcher.ElementMatchers.any;
import static net.bytebuddy.matcher.ElementMatchers.nameContains;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.not;

public class ElasticApmAgent {

Expand Down Expand Up @@ -298,7 +299,7 @@ private static AgentBuilder getAgentBuilder(final ByteBuddy byteBuddy, final Cor
: AgentBuilder.PoolStrategy.Default.FAST)
.ignore(any(), isReflectionClassLoader())
.or(any(), classLoaderWithName("org.codehaus.groovy.runtime.callsite.CallSiteClassLoader"))
.or(nameStartsWith("java."))
.or(nameStartsWith("java.").and(not(nameStartsWith("java.util.concurrent."))))
.or(nameStartsWith("com.sun."))
.or(nameStartsWith("sun"))
.or(nameStartsWith("org.aspectj."))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*-
* #%L
* Elastic APM Java agent
* %%
* Copyright (C) 2018-2019 Elastic and contributors
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package co.elastic.apm.agent.impl;


import co.elastic.apm.agent.bci.VisibleForAdvice;
import co.elastic.apm.agent.impl.transaction.TraceContext;
import co.elastic.apm.agent.objectpool.Recyclable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

@VisibleForAdvice
public class ContextInScopeRunnableWrapper implements Runnable, Recyclable {
private static final Logger logger = LoggerFactory.getLogger(ContextInScopeRunnableWrapper.class);
private final ElasticApmTracer tracer;
private final TraceContext context;
@Nullable
private volatile Runnable delegate;

ContextInScopeRunnableWrapper(ElasticApmTracer tracer) {
this.tracer = tracer;
context = TraceContext.with64BitId(tracer);
}

ContextInScopeRunnableWrapper wrap(Runnable delegate, TraceContext context) {
this.context.copyFrom(context);
// ordering is important: volatile write has to be after copying the TraceContext to ensure visibility in #run
this.delegate = delegate;
return this;
}

// Exceptions in the agent may never affect the monitored application
// normally, advices act as the boundary of user and agent code and exceptions are handled via @Advice.OnMethodEnter(suppress = Throwable.class)
// In this case, this class acts as the boundary of user and agent code so we have to do the tedious exception handling here
@Override
public void run() {
try {
context.activate();
} catch (Throwable t) {
try {
logger.error("Unexpected error while activating span", t);
} catch (Throwable ignore) {
}
}
try {
//noinspection ConstantConditions
delegate.run();
} catch (Exception e) {
// although the corresponding span may be ended at this point,
// we still have a copy of it's TraceContext so we can track errors here
// (in contrast to SpanInScopeRunnableWrapper)
context.captureException(e);
} finally {
try {
context.deactivate();
tracer.recycle(this);
} catch (Throwable t) {
try {
logger.error("Unexpected error while activating span", t);
} catch (Throwable ignore) {
}
}
}
}

@Override
public void resetState() {
context.resetState();
delegate = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,21 @@
public class ElasticApmTracer {
private static final Logger logger = LoggerFactory.getLogger(ElasticApmTracer.class);

/**
* The number of required {@link Runnable} wrappers does not depend on the size of the disruptor
* but rather on the amount of application threads.
* The requirement increases if the application tends to wrap multiple {@link Runnable}s.
*/
private static final int MAX_POOLED_RUNNABLES = 256;

private final ConfigurationRegistry configurationRegistry;
private final StacktraceConfiguration stacktraceConfiguration;
private final Iterable<LifecycleListener> lifecycleListeners;
private final ObjectPool<Transaction> transactionPool;
private final ObjectPool<Span> spanPool;
private final ObjectPool<ErrorCapture> errorPool;
private final ObjectPool<InScopeRunnableWrapper> runnableWrapperObjectPool;
private final ObjectPool<SpanInScopeRunnableWrapper> runnableSpanWrapperObjectPool;
private final ObjectPool<ContextInScopeRunnableWrapper> runnableContextWrapperObjectPool;
private final Reporter reporter;
// Maintains a stack of all the activated spans
// This way its easy to retrieve the bottom of the stack (the transaction)
Expand Down Expand Up @@ -111,11 +119,20 @@ public ErrorCapture createInstance() {
return new ErrorCapture(ElasticApmTracer.this);
}
});
runnableWrapperObjectPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.<InScopeRunnableWrapper>newQueue(createBoundedMpmc(maxPooledElements)), false,
new Allocator<InScopeRunnableWrapper>() {
// consider specialized object pools which return the objects to the thread-local pool of their originating thread
// with a combination of DetachedThreadLocal and org.jctools.queues.MpscRelaxedArrayQueue
runnableSpanWrapperObjectPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.<SpanInScopeRunnableWrapper>newQueue(createBoundedMpmc(MAX_POOLED_RUNNABLES)), false,
new Allocator<SpanInScopeRunnableWrapper>() {
@Override
public SpanInScopeRunnableWrapper createInstance() {
return new SpanInScopeRunnableWrapper(ElasticApmTracer.this);
}
});
runnableContextWrapperObjectPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.<ContextInScopeRunnableWrapper>newQueue(createBoundedMpmc(MAX_POOLED_RUNNABLES)), false,
new Allocator<ContextInScopeRunnableWrapper>() {
@Override
public InScopeRunnableWrapper createInstance() {
return new InScopeRunnableWrapper(ElasticApmTracer.this);
public ContextInScopeRunnableWrapper createInstance() {
return new ContextInScopeRunnableWrapper(ElasticApmTracer.this);
}
});
sampler = ProbabilitySampler.of(coreConfiguration.getSampleRate().get());
Expand Down Expand Up @@ -301,11 +318,25 @@ public void recycle(ErrorCapture error) {
}

public Runnable wrapRunnable(Runnable delegate, AbstractSpan<?> span) {
return runnableWrapperObjectPool.createInstance().wrap(delegate, span);
if (delegate instanceof SpanInScopeRunnableWrapper) {
return delegate;
}
return runnableSpanWrapperObjectPool.createInstance().wrap(delegate, span);
}

public void recycle(SpanInScopeRunnableWrapper wrapper) {
runnableSpanWrapperObjectPool.recycle(wrapper);
}

public Runnable wrapRunnable(Runnable delegate, TraceContext traceContext) {
if (delegate instanceof SpanInScopeRunnableWrapper) {
return delegate;
}
return runnableContextWrapperObjectPool.createInstance().wrap(delegate, traceContext);
}

public void recycle(InScopeRunnableWrapper wrapper) {
runnableWrapperObjectPool.recycle(wrapper);
public void recycle(ContextInScopeRunnableWrapper wrapper) {
runnableContextWrapperObjectPool.recycle(wrapper);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,54 +29,55 @@
import javax.annotation.Nullable;

@VisibleForAdvice
public class InScopeRunnableWrapper implements Runnable, Recyclable {
private final Logger logger = LoggerFactory.getLogger(InScopeRunnableWrapper.class);

public class SpanInScopeRunnableWrapper implements Runnable, Recyclable {
private static final Logger logger = LoggerFactory.getLogger(SpanInScopeRunnableWrapper.class);
private final ElasticApmTracer tracer;
@Nullable
private Runnable delegate;
private volatile Runnable delegate;
@Nullable
private AbstractSpan<?> span;
private volatile AbstractSpan<?> span;

private final ElasticApmTracer tracer;

InScopeRunnableWrapper(ElasticApmTracer tracer) {
SpanInScopeRunnableWrapper(ElasticApmTracer tracer) {
this.tracer = tracer;
}

public InScopeRunnableWrapper wrap(Runnable delegate, AbstractSpan<?> span) {
SpanInScopeRunnableWrapper wrap(Runnable delegate, AbstractSpan<?> span) {
this.delegate = delegate;
this.span = span;
return this;
}

// Exceptions in the agent may never affect the monitored application
// normally, advices act as the boundary of user and agent code and exceptions are handled via @Advice.OnMethodEnter(suppress = Throwable.class)
// In this case, this class acts as the boundary of user and agent code so we have to do the tedious exception handling here
@Override
public void run() {
if (span != null) {
// minimize volatile reads
AbstractSpan<?> localSpan = span;
if (localSpan != null) {
try {
span.activate();
} catch (Throwable throwable) {
localSpan.activate();
} catch (Throwable t) {
try {
logger.warn("Failed to activate span");
} catch (Throwable t) {
// do nothing, just never fail
logger.error("Unexpected error while activating span", t);
} catch (Throwable ignore) {
}
}
}

try {
//noinspection ConstantConditions
delegate.run();
// the span may be ended at this point
} finally {
try {
if (span != null) {
span.deactivate();
if (localSpan != null) {
localSpan.deactivate();
}
tracer.recycle(this);
} catch (Throwable throwable) {
} catch (Throwable t) {
try {
logger.warn("Failed to deactivate span or recycle");
} catch (Throwable t) {
// do nothing, just never fail
logger.error("Unexpected error while activating span", t);
} catch (Throwable ignore) {
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,16 @@ public boolean isChildOf(TraceContextHolder other) {
return getTraceContext().isChildOf(other);
}

/**
* Wraps the provided runnable and makes this {@link AbstractSpan} active in the {@link Runnable#run()} method.
*
* <p>
* Note: does activates the {@link AbstractSpan} and not only the {@link TraceContext}.
* This should only be used span is closed in thread the provided {@link Runnable} is executed in.
* </p>
*/
public Runnable withActiveSpan(Runnable runnable) {
return tracer.wrapRunnable(runnable, this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,44 +61,30 @@ protected TraceContextHolder(ElasticApmTracer tracer) {
public abstract boolean isChildOf(TraceContextHolder other);

public T activate() {
try {
tracer.activate(this);
List<SpanListener> spanListeners = tracer.getSpanListeners();
for (int i = 0; i < spanListeners.size(); i++) {
try {
spanListeners.get(i).onActivate(this);
} catch (Error e) {
throw e;
} catch (Throwable t) {
logger.warn("Exception while calling {}#onActivate", spanListeners.get(i).getClass().getSimpleName(), t);
}
}
} catch (Throwable t) {
tracer.activate(this);
List<SpanListener> spanListeners = tracer.getSpanListeners();
for (int i = 0; i < spanListeners.size(); i++) {
try {
logger.error("Unexpected error while activating context", t);
} catch (Throwable ignore) {
spanListeners.get(i).onActivate(this);
} catch (Error e) {
throw e;
} catch (Throwable t) {
logger.warn("Exception while calling {}#onActivate", spanListeners.get(i).getClass().getSimpleName(), t);
}
}
return (T) this;
}

public T deactivate() {
try {
tracer.deactivate(this);
List<SpanListener> spanListeners = tracer.getSpanListeners();
for (int i = 0; i < spanListeners.size(); i++) {
try {
spanListeners.get(i).onDeactivate(this);
} catch (Error e) {
throw e;
} catch (Throwable t) {
logger.warn("Exception while calling {}#onDeactivate", spanListeners.get(i).getClass().getSimpleName(), t);
}
}
} catch (Throwable t) {
tracer.deactivate(this);
List<SpanListener> spanListeners = tracer.getSpanListeners();
for (int i = 0; i < spanListeners.size(); i++) {
try {
logger.error("Unexpected error while activating context", t);
} catch (Throwable ignore) {
spanListeners.get(i).onDeactivate(this);
} catch (Error e) {
throw e;
} catch (Throwable t) {
logger.warn("Exception while calling {}#onDeactivate", spanListeners.get(i).getClass().getSimpleName(), t);
}
}
return (T) this;
Expand Down Expand Up @@ -131,4 +117,16 @@ public T captureException(@Nullable Throwable t) {
return (T) this;
}

/**
* Wraps the provided runnable and makes this {@link TraceContext} active in the {@link Runnable#run()} method.
*
* <p>
* Note: does not activate the {@link AbstractSpan} but only the {@link TraceContext}.
* This is useful if this span is closed in a different thread than the provided {@link Runnable} is executed in.
* </p>
*/
public Runnable withActiveContext(Runnable runnable) {
return tracer.wrapRunnable(runnable, getTraceContext());
}

}
14 changes: 14 additions & 0 deletions apm-agent-plugins/apm-java-concurrent-plugin/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-agent-plugins</artifactId>
<groupId>co.elastic.apm</groupId>
<version>1.3.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>apm-java-concurrent-plugin</artifactId>

</project>
Loading

0 comments on commit 51df3c4

Please sign in to comment.