Skip to content

Commit

Permalink
ExecutorService instrumentation (#430)
Browse files Browse the repository at this point in the history
closes #145
  • Loading branch information
felixbarny authored Jan 29, 2019
1 parent f254571 commit 9017aa9
Show file tree
Hide file tree
Showing 28 changed files with 1,038 additions and 81 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Features
* Added support for sync calls of OkHttp client
* Added support for context propagation for `java.util.concurrent.ExecutorService`s
* The `trace_methods` configuration now allows to omit the method matcher.
Example: `com.example.*` traces all classes and methods within the `com.example` package and sub-packages.
* Added support for JSF. Tested on WildFly, WebSphere Liberty and Payara with embedded JSF implementation and on Tomcat and Jetty with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ private static AgentBuilder getAgentBuilder(final ByteBuddy byteBuddy, final Cor
: AgentBuilder.PoolStrategy.Default.FAST)
.ignore(any(), isReflectionClassLoader())
.or(any(), classLoaderWithName("org.codehaus.groovy.runtime.callsite.CallSiteClassLoader"))
.or(nameStartsWith("java."))
.or(nameStartsWith("java.").and(not(nameStartsWith("java.util.concurrent."))))
.or(nameStartsWith("com.sun.").and(not(nameStartsWith("com.sun.faces."))))
.or(nameStartsWith("sun"))
.or(nameStartsWith("org.aspectj."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package co.elastic.apm.agent.bci;

import co.elastic.apm.agent.impl.ElasticApmTracer;
import co.elastic.apm.agent.impl.transaction.TraceContextHolder;
import net.bytebuddy.description.NamedElement;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
Expand Down Expand Up @@ -59,6 +60,14 @@ static void staticInit(ElasticApmTracer tracer) {
ElasticApmInstrumentation.tracer = tracer;
}

@Nullable
public static TraceContextHolder<?> getActive() {
if (tracer != null) {
return tracer.getActive();
}
return null;
}

public void init(ElasticApmTracer tracer) {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*-
* #%L
* Elastic APM Java agent
* %%
* Copyright (C) 2018-2019 Elastic and contributors
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package co.elastic.apm.agent.impl;


import co.elastic.apm.agent.bci.VisibleForAdvice;
import co.elastic.apm.agent.impl.transaction.TraceContext;
import co.elastic.apm.agent.objectpool.Recyclable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.concurrent.Callable;

@VisibleForAdvice
public class ContextInScopeCallableWrapper<V> implements Callable<V>, Recyclable {
private static final Logger logger = LoggerFactory.getLogger(ContextInScopeCallableWrapper.class);
private final ElasticApmTracer tracer;
private final TraceContext context;
@Nullable
private volatile Callable<V> delegate;

ContextInScopeCallableWrapper(ElasticApmTracer tracer) {
this.tracer = tracer;
context = TraceContext.with64BitId(tracer);
}

ContextInScopeCallableWrapper<V> wrap(Callable<V> delegate, TraceContext context) {
this.context.copyFrom(context);
// ordering is important: volatile write has to be after copying the TraceContext to ensure visibility in #run
this.delegate = delegate;
return this;
}

// Exceptions in the agent may never affect the monitored application
// normally, advices act as the boundary of user and agent code and exceptions are handled via @Advice.OnMethodEnter(suppress = Throwable.class)
// In this case, this class acts as the boundary of user and agent code so we have to do the tedious exception handling here
@Override
public V call() throws Exception {
try {
context.activate();
} catch (Throwable t) {
try {
logger.error("Unexpected error while activating span", t);
} catch (Throwable ignore) {
}
}
try {
//noinspection ConstantConditions
return delegate.call();
} finally {
try {
context.deactivate();
tracer.recycle(this);
} catch (Throwable t) {
try {
logger.error("Unexpected error while deactivating or recycling span", t);
} catch (Throwable ignore) {
}
}
}
}

@Override
public void resetState() {
context.resetState();
delegate = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*-
* #%L
* Elastic APM Java agent
* %%
* Copyright (C) 2018-2019 Elastic and contributors
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package co.elastic.apm.agent.impl;


import co.elastic.apm.agent.bci.VisibleForAdvice;
import co.elastic.apm.agent.impl.transaction.TraceContext;
import co.elastic.apm.agent.objectpool.Recyclable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

@VisibleForAdvice
public class ContextInScopeRunnableWrapper implements Runnable, Recyclable {
private static final Logger logger = LoggerFactory.getLogger(ContextInScopeRunnableWrapper.class);
private final ElasticApmTracer tracer;
private final TraceContext context;
@Nullable
private volatile Runnable delegate;

ContextInScopeRunnableWrapper(ElasticApmTracer tracer) {
this.tracer = tracer;
context = TraceContext.with64BitId(tracer);
}

ContextInScopeRunnableWrapper wrap(Runnable delegate, TraceContext context) {
this.context.copyFrom(context);
// ordering is important: volatile write has to be after copying the TraceContext to ensure visibility in #run
this.delegate = delegate;
return this;
}

// Exceptions in the agent may never affect the monitored application
// normally, advices act as the boundary of user and agent code and exceptions are handled via @Advice.OnMethodEnter(suppress = Throwable.class)
// In this case, this class acts as the boundary of user and agent code so we have to do the tedious exception handling here
@Override
public void run() {
try {
context.activate();
} catch (Throwable t) {
try {
logger.error("Unexpected error while activating span", t);
} catch (Throwable ignore) {
}
}
try {
//noinspection ConstantConditions
delegate.run();
} finally {
try {
context.deactivate();
tracer.recycle(this);
} catch (Throwable t) {
try {
logger.error("Unexpected error while deactivating or recycling span", t);
} catch (Throwable ignore) {
}
}
}
}

@Override
public void resetState() {
context.resetState();
delegate = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.Callable;

import static org.jctools.queues.spec.ConcurrentQueueSpec.createBoundedMpmc;

Expand All @@ -59,13 +60,22 @@
public class ElasticApmTracer {
private static final Logger logger = LoggerFactory.getLogger(ElasticApmTracer.class);

/**
* The number of required {@link Runnable} wrappers does not depend on the size of the disruptor
* but rather on the amount of application threads.
* The requirement increases if the application tends to wrap multiple {@link Runnable}s.
*/
private static final int MAX_POOLED_RUNNABLES = 256;

private final ConfigurationRegistry configurationRegistry;
private final StacktraceConfiguration stacktraceConfiguration;
private final Iterable<LifecycleListener> lifecycleListeners;
private final ObjectPool<Transaction> transactionPool;
private final ObjectPool<Span> spanPool;
private final ObjectPool<ErrorCapture> errorPool;
private final ObjectPool<InScopeRunnableWrapper> runnableWrapperObjectPool;
private final ObjectPool<SpanInScopeRunnableWrapper> runnableSpanWrapperObjectPool;
private final ObjectPool<ContextInScopeRunnableWrapper> runnableContextWrapperObjectPool;
private final ObjectPool<ContextInScopeCallableWrapper<?>> callableContextWrapperObjectPool;
private final Reporter reporter;
// Maintains a stack of all the activated spans
// This way its easy to retrieve the bottom of the stack (the transaction)
Expand Down Expand Up @@ -112,11 +122,27 @@ public ErrorCapture createInstance() {
return new ErrorCapture(ElasticApmTracer.this);
}
});
runnableWrapperObjectPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.<InScopeRunnableWrapper>newQueue(createBoundedMpmc(maxPooledElements)), false,
new Allocator<InScopeRunnableWrapper>() {
// consider specialized object pools which return the objects to the thread-local pool of their originating thread
// with a combination of DetachedThreadLocal and org.jctools.queues.MpscRelaxedArrayQueue
runnableSpanWrapperObjectPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.<SpanInScopeRunnableWrapper>newQueue(createBoundedMpmc(MAX_POOLED_RUNNABLES)), false,
new Allocator<SpanInScopeRunnableWrapper>() {
@Override
public SpanInScopeRunnableWrapper createInstance() {
return new SpanInScopeRunnableWrapper(ElasticApmTracer.this);
}
});
runnableContextWrapperObjectPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.<ContextInScopeRunnableWrapper>newQueue(createBoundedMpmc(MAX_POOLED_RUNNABLES)), false,
new Allocator<ContextInScopeRunnableWrapper>() {
@Override
public InScopeRunnableWrapper createInstance() {
return new InScopeRunnableWrapper(ElasticApmTracer.this);
public ContextInScopeRunnableWrapper createInstance() {
return new ContextInScopeRunnableWrapper(ElasticApmTracer.this);
}
});
callableContextWrapperObjectPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.<ContextInScopeCallableWrapper<?>>newQueue(createBoundedMpmc(MAX_POOLED_RUNNABLES)), false,
new Allocator<ContextInScopeCallableWrapper<?>>() {
@Override
public ContextInScopeCallableWrapper<?> createInstance() {
return new ContextInScopeCallableWrapper<>(ElasticApmTracer.this);
}
});
sampler = ProbabilitySampler.of(coreConfiguration.getSampleRate().get());
Expand Down Expand Up @@ -302,11 +328,36 @@ public void recycle(ErrorCapture error) {
}

public Runnable wrapRunnable(Runnable delegate, AbstractSpan<?> span) {
return runnableWrapperObjectPool.createInstance().wrap(delegate, span);
if (delegate instanceof SpanInScopeRunnableWrapper) {
return delegate;
}
return runnableSpanWrapperObjectPool.createInstance().wrap(delegate, span);
}

public void recycle(SpanInScopeRunnableWrapper wrapper) {
runnableSpanWrapperObjectPool.recycle(wrapper);
}

public Runnable wrapRunnable(Runnable delegate, TraceContext traceContext) {
if (delegate instanceof ContextInScopeRunnableWrapper) {
return delegate;
}
return runnableContextWrapperObjectPool.createInstance().wrap(delegate, traceContext);
}

public void recycle(ContextInScopeRunnableWrapper wrapper) {
runnableContextWrapperObjectPool.recycle(wrapper);
}

public <V> Callable<V> wrapCallable(Callable<V> delegate, TraceContext traceContext) {
if (delegate instanceof ContextInScopeCallableWrapper) {
return delegate;
}
return ((ContextInScopeCallableWrapper<V>) callableContextWrapperObjectPool.createInstance()).wrap(delegate, traceContext);
}

public void recycle(InScopeRunnableWrapper wrapper) {
runnableWrapperObjectPool.recycle(wrapper);
public void recycle(ContextInScopeCallableWrapper<?> callableWrapper) {
callableContextWrapperObjectPool.recycle(callableWrapper);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,54 +29,55 @@
import javax.annotation.Nullable;

@VisibleForAdvice
public class InScopeRunnableWrapper implements Runnable, Recyclable {
private final Logger logger = LoggerFactory.getLogger(InScopeRunnableWrapper.class);

public class SpanInScopeRunnableWrapper implements Runnable, Recyclable {
private static final Logger logger = LoggerFactory.getLogger(SpanInScopeRunnableWrapper.class);
private final ElasticApmTracer tracer;
@Nullable
private Runnable delegate;
private volatile Runnable delegate;
@Nullable
private AbstractSpan<?> span;
private volatile AbstractSpan<?> span;

private final ElasticApmTracer tracer;

InScopeRunnableWrapper(ElasticApmTracer tracer) {
SpanInScopeRunnableWrapper(ElasticApmTracer tracer) {
this.tracer = tracer;
}

public InScopeRunnableWrapper wrap(Runnable delegate, AbstractSpan<?> span) {
SpanInScopeRunnableWrapper wrap(Runnable delegate, AbstractSpan<?> span) {
this.delegate = delegate;
this.span = span;
return this;
}

// Exceptions in the agent may never affect the monitored application
// normally, advices act as the boundary of user and agent code and exceptions are handled via @Advice.OnMethodEnter(suppress = Throwable.class)
// In this case, this class acts as the boundary of user and agent code so we have to do the tedious exception handling here
@Override
public void run() {
if (span != null) {
// minimize volatile reads
AbstractSpan<?> localSpan = span;
if (localSpan != null) {
try {
span.activate();
} catch (Throwable throwable) {
localSpan.activate();
} catch (Throwable t) {
try {
logger.warn("Failed to activate span");
} catch (Throwable t) {
// do nothing, just never fail
logger.error("Unexpected error while activating span", t);
} catch (Throwable ignore) {
}
}
}

try {
//noinspection ConstantConditions
delegate.run();
// the span may be ended at this point
} finally {
try {
if (span != null) {
span.deactivate();
if (localSpan != null) {
localSpan.deactivate();
}
tracer.recycle(this);
} catch (Throwable throwable) {
} catch (Throwable t) {
try {
logger.warn("Failed to deactivate span or recycle");
} catch (Throwable t) {
// do nothing, just never fail
logger.error("Unexpected error while deactivating or recycling span", t);
} catch (Throwable ignore) {
}
}
}
Expand Down
Loading

0 comments on commit 9017aa9

Please sign in to comment.