Skip to content

Commit

Permalink
GH-9743: Add observation to the SourcePollingChannelAdapter
Browse files Browse the repository at this point in the history
Fixes: #9743

Spring Integration provides observation for the `MessageChannel`, `MessageHandler`
and `MessageProducerSupport`.
The `SourcePollingChannelAdapter` is missing, and it is that only special endpoint which
deals with `MessageSource` implementations via scheduled tasks in the poller.
Essentially, this endpoint is a start of the flow, but it still is a consumer of data from the source system.

* Add an `Observation` logic to the `SourcePollingChannelAdapter`.
* Divide it into two phases: start (and open scope) when message is received; stop (and close scope) when the whole polling task for a message is done.
We need this separation because of transaction scope for the polling task.
At the same time we don't want to emit an observation for a void polling task.
* Change `MessageReceiverContext` to accept a `handlerType`.
The `MessageHandler` contributes a `handler`.
The new support in the `SourcePollingChannelAdapter` - `message-source`.
And change `MessageProducerSupport` to contribute a `message-producer`
* Verify the single trace is supported for the whole flow (including transaction synchronization) starting from a `SourcePollingChannelAdapter` in a new `SourcePollingChannelAdapterObservationTests`
* Document this new feature
  • Loading branch information
artembilan committed Jan 9, 2025
1 parent 177bda5 commit 89c5dfa
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,6 +51,7 @@
import org.springframework.integration.transaction.TransactionSynchronizationFactory;
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
Expand Down Expand Up @@ -416,10 +417,12 @@ private Flux<Message<?>> createFluxGenerator() {
}

private Message<?> pollForMessage() {
Exception pollingTaskError = null;
try {
return this.pollingTask.call();
}
catch (Exception ex) {
pollingTaskError = ex;
if (ex instanceof MessagingException) { // NOSONAR
throw (MessagingException) ex;
}
Expand All @@ -441,6 +444,7 @@ private Message<?> pollForMessage() {
TransactionSynchronizationManager.unbindResource(resource);
}
}
donePollingTask(pollingTaskError);
}
}

Expand Down Expand Up @@ -471,7 +475,7 @@ private Message<?> doPoll() {
return message;
}

private void messageReceived(IntegrationResourceHolder holder, Message<?> message) {
protected void messageReceived(@Nullable IntegrationResourceHolder holder, Message<?> message) {
this.logger.debug(() -> "Poll resulted in Message: " + message);
if (holder != null) {
holder.setMessage(message);
Expand All @@ -490,6 +494,10 @@ private void messageReceived(IntegrationResourceHolder holder, Message<?> messag
}
}

protected void donePollingTask(@Nullable Exception pollingTaskError) {

}

@Override // guarded by super#lifecycleLock
protected void doStop() {
if (this.runningTask != null) {
Expand Down Expand Up @@ -536,6 +544,7 @@ protected String getResourceKey() {
return null;
}

@Nullable
private IntegrationResourceHolder bindResourceHolderIfNecessary(String key, Object resource) {
if (this.transactionSynchronizationFactory != null && resource != null &&
TransactionSynchronizationManager.isActualTransactionActive()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -256,7 +256,7 @@ protected void sendMessage(Message<?> message) {
IntegrationObservation.HANDLER.observation(
this.observationConvention,
DefaultMessageReceiverObservationConvention.INSTANCE,
() -> new MessageReceiverContext(message, getComponentName()),
() -> new MessageReceiverContext(message, getComponentName(), "message-producer"),
this.observationRegistry)
.observe(() -> this.messagingTemplate.send(getRequiredOutputChannel(), trackMessageIfAny(message)));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,8 +19,12 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;

import org.springframework.aop.framework.Advised;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.Lifecycle;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AckUtils;
Expand All @@ -31,16 +35,21 @@
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.history.MessageHistory;
import org.springframework.integration.support.context.NamedComponent;
import org.springframework.integration.support.management.IntegrationManagement;
import org.springframework.integration.support.management.TrackableComponent;
import org.springframework.integration.support.management.observation.DefaultMessageReceiverObservationConvention;
import org.springframework.integration.support.management.observation.IntegrationObservation;
import org.springframework.integration.support.management.observation.MessageReceiverContext;
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
import org.springframework.integration.transaction.IntegrationResourceHolder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

/**
* A Channel Adapter implementation for connecting a
* {@link MessageSource} to a {@link MessageChannel}.
* A Channel Adapter implementation for connecting a {@link MessageSource} to a {@link MessageChannel}.
*
* @author Mark Fisher
* @author Oleg Zhurakousky
Expand All @@ -49,12 +58,16 @@
* @author Christian Tzolov
*/
public class SourcePollingChannelAdapter extends AbstractPollingEndpoint
implements TrackableComponent {
implements TrackableComponent, IntegrationManagement {

private final MessagingTemplate messagingTemplate = new MessagingTemplate();

private MessageSource<?> originalSource;

private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;

private MessageReceiverObservationConvention observationConvention;

private volatile MessageSource<?> source;

private volatile MessageChannel outputChannel;
Expand All @@ -67,7 +80,6 @@ public class SourcePollingChannelAdapter extends AbstractPollingEndpoint

/**
* Specify the source to be polled for Messages.
*
* @param source The message source.
*/
public void setSource(MessageSource<?> source) {
Expand All @@ -76,14 +88,13 @@ public void setSource(MessageSource<?> source) {
Object target = extractProxyTarget(source);
this.originalSource = target != null ? (MessageSource<?>) target : source;

if (source instanceof ExpressionCapable) {
setPrimaryExpression(((ExpressionCapable) source).getExpression());
if (source instanceof ExpressionCapable expressionCapable) {
setPrimaryExpression(expressionCapable.getExpression());
}
}

/**
* Specify the {@link MessageChannel} where Messages should be sent.
*
* @param outputChannel The output channel.
*/
public void setOutputChannel(MessageChannel outputChannel) {
Expand All @@ -105,9 +116,7 @@ public void setOutputChannelName(String outputChannelName) {
}

/**
* Specify the maximum time to wait for a Message to be sent to the
* output channel.
*
* Specify the maximum time to wait for a Message to be sent to the output channel.
* @param sendTimeout The send timeout.
*/
public void setSendTimeout(long sendTimeout) {
Expand All @@ -116,18 +125,38 @@ public void setSendTimeout(long sendTimeout) {

/**
* Specify whether this component should be tracked in the Message History.
*
* @param shouldTrack true if the component should be tracked.
*/
@Override
public void setShouldTrack(boolean shouldTrack) {
this.shouldTrack = shouldTrack;
}

@Override
public void registerObservationRegistry(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

/**
* Set a custom {@link MessageReceiverObservationConvention} for {@link IntegrationObservation#HANDLER}.
* Ignored if an {@link ObservationRegistry} is not configured for this component.
* @param observationConvention the {@link MessageReceiverObservationConvention} to use.
* @since 6.5
*/
public void setObservationConvention(@Nullable MessageReceiverObservationConvention observationConvention) {
this.observationConvention = observationConvention;
}

@Override
public boolean isObserved() {
return !ObservationRegistry.NOOP.equals(this.observationRegistry);
}

@Override
public String getComponentType() {
return (this.source instanceof NamedComponent) ?
((NamedComponent) this.source).getComponentType() : "inbound-channel-adapter";
return (this.source instanceof NamedComponent namedComponent)
? namedComponent.getComponentType()
: "inbound-channel-adapter";
}

@Override
Expand All @@ -147,8 +176,8 @@ protected final void setReceiveMessageSource(Object source) {

@Override
protected void doStart() {
if (this.source instanceof Lifecycle) {
((Lifecycle) this.source).start();
if (this.source instanceof Lifecycle lifecycle) {
lifecycle.start();
}
super.doStart();

Expand All @@ -160,8 +189,8 @@ protected void doStart() {
@Override
protected void doStop() {
super.doStop();
if (this.source instanceof Lifecycle) {
((Lifecycle) this.source).stop();
if (this.source instanceof Lifecycle lifecycle) {
lifecycle.stop();
}
}

Expand All @@ -172,8 +201,9 @@ protected void onInit() {
|| (this.outputChannelName != null && this.outputChannel == null),
"One and only one of 'outputChannelName' or 'outputChannel' is required.");
super.onInit();
if (this.getBeanFactory() != null) {
this.messagingTemplate.setBeanFactory(this.getBeanFactory());
BeanFactory beanFactory = getBeanFactory();
if (beanFactory != null) {
this.messagingTemplate.setBeanFactory(beanFactory);
}
}

Expand Down Expand Up @@ -204,13 +234,13 @@ protected void handleMessage(Message<?> messageArg) {
this.messagingTemplate.send(getOutputChannel(), message);
AckUtils.autoAck(ackCallback);
}
catch (Exception e) {
catch (Exception ex) {
AckUtils.autoNack(ackCallback);
if (e instanceof MessagingException) { // NOSONAR
throw (MessagingException) e;
if (ex instanceof MessagingException messagingException) { // NOSONAR
throw messagingException;
}
else {
throw new MessagingException(message, "Failed to send Message", e);
throw new MessagingException(message, "Failed to send Message", ex);
}
}
}
Expand All @@ -220,6 +250,41 @@ protected Message<?> receiveMessage() {
return this.source.receive();
}

/**
* Start an observation (and open scope) for the received message.
* @param holder the resource holder for this component.
* @param message the received message.
*/
@Override
protected void messageReceived(@Nullable IntegrationResourceHolder holder, Message<?> message) {
Observation observation =
IntegrationObservation.HANDLER.observation(this.observationConvention,
DefaultMessageReceiverObservationConvention.INSTANCE,
() -> new MessageReceiverContext(message, getComponentName(), "message-source"),
this.observationRegistry);

observation.start().openScope();
super.messageReceived(holder, message);
}

/**
* Stop an observation (and close its scope) previously started
* from the {@link #messageReceived(IntegrationResourceHolder, Message)}.
* @param pollingTaskError an optional error as a result of the polling task.
*/
@Override
protected void donePollingTask(@Nullable Exception pollingTaskError) {
Observation.Scope currentObservationScope = this.observationRegistry.getCurrentObservationScope();
if (currentObservationScope != null) {
currentObservationScope.close();
Observation currentObservation = currentObservationScope.getCurrentObservation();
if (pollingTaskError != null) {
currentObservation.error(pollingTaskError);
}
currentObservation.stop();
}
}

@Override
protected Object getResourceToBind() {
return this.originalSource;
Expand All @@ -230,16 +295,16 @@ protected String getResourceKey() {
return IntegrationResourceHolder.MESSAGE_SOURCE;
}

@Nullable
private static Object extractProxyTarget(Object target) {
if (!(target instanceof Advised)) {
if (!(target instanceof Advised advised)) {
return target;
}
Advised advised = (Advised) target;
try {
return extractProxyTarget(advised.getTargetSource().getTarget());
}
catch (Exception e) {
throw new BeanCreationException("Could not extract target", e);
catch (Exception ex) {
throw new BeanCreationException("Could not extract target", ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,7 +40,7 @@ public KeyValues getLowCardinalityKeyValues(MessageReceiverContext context) {
// See IntegrationObservation.HandlerTags.COMPONENT_NAME - to avoid class tangle
.of("spring.integration.name", context.getHandlerName())
// See IntegrationObservation.HandlerTags.COMPONENT_TYPE - to avoid class tangle
.and("spring.integration.type", "handler");
.and("spring.integration.type", context.getHandlerType());
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2024 the original author or authors.
* Copyright 2022-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,10 +36,24 @@ public class MessageReceiverContext extends ReceiverContext<Message<?>> {

private final String handlerName;

private final String handlerType;

public MessageReceiverContext(Message<?> message, @Nullable String handlerName) {
this(message, handlerName, "handler");
}

/**
* Construct an instance based on the message, the handler (or source, producer) bean name and handler type.
* @param message the received message for this context.
* @param handlerName the handler (or source, producer) bean name processing the message.
* @param handlerType the handler type: {@code handler}, or {@code message-source}, or {@code message-producer}.
* @since 6.5
*/
public MessageReceiverContext(Message<?> message, @Nullable String handlerName, String handlerType) {
super(MessageReceiverContext::getHeader);
this.message = message;
this.handlerName = handlerName != null ? handlerName : "unknown";
this.handlerType = handlerType;
}

@Override
Expand All @@ -51,6 +65,10 @@ public String getHandlerName() {
return this.handlerName;
}

public String getHandlerType() {
return this.handlerType;
}

@Nullable
private static String getHeader(Message<?> message, String key) {
Object value = message.getHeaders().get(key);
Expand Down
Loading

0 comments on commit 89c5dfa

Please sign in to comment.