From 89c5dfa4d6de7dc8072001f0a4740ff1b3bb9092 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 9 Jan 2025 13:31:02 -0500 Subject: [PATCH] GH-9743: Add observation to the `SourcePollingChannelAdapter` Fixes: https://github.com/spring-projects/spring-integration/issues/9743 Spring Integration provides observation for the `MessageChannel`, `MessageHandler` and `MessageProducerSupport`. The `SourcePollingChannelAdapter` is missing, and it is that only special endpoint which deals with `MessageSource` implementations via scheduled tasks in the poller. Essentially, this endpoint is a start of the flow, but it still is a consumer of data from the source system. * Add an `Observation` logic to the `SourcePollingChannelAdapter`. * Divide it into two phases: start (and open scope) when message is received; stop (and close scope) when the whole polling task for a message is done. We need this separation because of transaction scope for the polling task. At the same time we don't want to emit an observation for a void polling task. * Change `MessageReceiverContext` to accept a `handlerType`. The `MessageHandler` contributes a `handler`. The new support in the `SourcePollingChannelAdapter` - `message-source`. And change `MessageProducerSupport` to contribute a `message-producer` * Verify the single trace is supported for the whole flow (including transaction synchronization) starting from a `SourcePollingChannelAdapter` in a new `SourcePollingChannelAdapterObservationTests` * Document this new feature --- .../endpoint/AbstractPollingEndpoint.java | 13 +- .../endpoint/MessageProducerSupport.java | 4 +- .../endpoint/SourcePollingChannelAdapter.java | 121 ++++++++++---- ...tMessageReceiverObservationConvention.java | 4 +- .../observation/MessageReceiverContext.java | 20 ++- ...PollingChannelAdapterObservationTests.java | 156 ++++++++++++++++++ .../antora/modules/ROOT/pages/metrics.adoc | 7 +- .../antora/modules/ROOT/pages/whats-new.adoc | 9 +- 8 files changed, 295 insertions(+), 39 deletions(-) create mode 100644 spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/SourcePollingChannelAdapterObservationTests.java diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java index a948dd553b7..c3b7860e469 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,6 +51,7 @@ import org.springframework.integration.transaction.TransactionSynchronizationFactory; import org.springframework.integration.util.ErrorHandlingTaskExecutor; import org.springframework.jmx.export.annotation.ManagedAttribute; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessagingException; @@ -416,10 +417,12 @@ private Flux> createFluxGenerator() { } private Message pollForMessage() { + Exception pollingTaskError = null; try { return this.pollingTask.call(); } catch (Exception ex) { + pollingTaskError = ex; if (ex instanceof MessagingException) { // NOSONAR throw (MessagingException) ex; } @@ -441,6 +444,7 @@ private Message pollForMessage() { TransactionSynchronizationManager.unbindResource(resource); } } + donePollingTask(pollingTaskError); } } @@ -471,7 +475,7 @@ private Message doPoll() { return message; } - private void messageReceived(IntegrationResourceHolder holder, Message message) { + protected void messageReceived(@Nullable IntegrationResourceHolder holder, Message message) { this.logger.debug(() -> "Poll resulted in Message: " + message); if (holder != null) { holder.setMessage(message); @@ -490,6 +494,10 @@ private void messageReceived(IntegrationResourceHolder holder, Message messag } } + protected void donePollingTask(@Nullable Exception pollingTaskError) { + + } + @Override // guarded by super#lifecycleLock protected void doStop() { if (this.runningTask != null) { @@ -536,6 +544,7 @@ protected String getResourceKey() { return null; } + @Nullable private IntegrationResourceHolder bindResourceHolderIfNecessary(String key, Object resource) { if (this.transactionSynchronizationFactory != null && resource != null && TransactionSynchronizationManager.isActualTransactionActive()) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java index ffed5aafc84..42c8bcf6cc1 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -256,7 +256,7 @@ protected void sendMessage(Message message) { IntegrationObservation.HANDLER.observation( this.observationConvention, DefaultMessageReceiverObservationConvention.INSTANCE, - () -> new MessageReceiverContext(message, getComponentName()), + () -> new MessageReceiverContext(message, getComponentName(), "message-producer"), this.observationRegistry) .observe(() -> this.messagingTemplate.send(getRequiredOutputChannel(), trackMessageIfAny(message))); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java index 1d2f270f58b..15fad23d68f 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,8 +19,12 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; + import org.springframework.aop.framework.Advised; import org.springframework.beans.factory.BeanCreationException; +import org.springframework.beans.factory.BeanFactory; import org.springframework.context.Lifecycle; import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.acks.AckUtils; @@ -31,16 +35,21 @@ import org.springframework.integration.core.MessagingTemplate; import org.springframework.integration.history.MessageHistory; import org.springframework.integration.support.context.NamedComponent; +import org.springframework.integration.support.management.IntegrationManagement; import org.springframework.integration.support.management.TrackableComponent; +import org.springframework.integration.support.management.observation.DefaultMessageReceiverObservationConvention; +import org.springframework.integration.support.management.observation.IntegrationObservation; +import org.springframework.integration.support.management.observation.MessageReceiverContext; +import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention; import org.springframework.integration.transaction.IntegrationResourceHolder; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessagingException; import org.springframework.util.Assert; /** - * A Channel Adapter implementation for connecting a - * {@link MessageSource} to a {@link MessageChannel}. + * A Channel Adapter implementation for connecting a {@link MessageSource} to a {@link MessageChannel}. * * @author Mark Fisher * @author Oleg Zhurakousky @@ -49,12 +58,16 @@ * @author Christian Tzolov */ public class SourcePollingChannelAdapter extends AbstractPollingEndpoint - implements TrackableComponent { + implements TrackableComponent, IntegrationManagement { private final MessagingTemplate messagingTemplate = new MessagingTemplate(); private MessageSource originalSource; + private ObservationRegistry observationRegistry = ObservationRegistry.NOOP; + + private MessageReceiverObservationConvention observationConvention; + private volatile MessageSource source; private volatile MessageChannel outputChannel; @@ -67,7 +80,6 @@ public class SourcePollingChannelAdapter extends AbstractPollingEndpoint /** * Specify the source to be polled for Messages. - * * @param source The message source. */ public void setSource(MessageSource source) { @@ -76,14 +88,13 @@ public void setSource(MessageSource source) { Object target = extractProxyTarget(source); this.originalSource = target != null ? (MessageSource) target : source; - if (source instanceof ExpressionCapable) { - setPrimaryExpression(((ExpressionCapable) source).getExpression()); + if (source instanceof ExpressionCapable expressionCapable) { + setPrimaryExpression(expressionCapable.getExpression()); } } /** * Specify the {@link MessageChannel} where Messages should be sent. - * * @param outputChannel The output channel. */ public void setOutputChannel(MessageChannel outputChannel) { @@ -105,9 +116,7 @@ public void setOutputChannelName(String outputChannelName) { } /** - * Specify the maximum time to wait for a Message to be sent to the - * output channel. - * + * Specify the maximum time to wait for a Message to be sent to the output channel. * @param sendTimeout The send timeout. */ public void setSendTimeout(long sendTimeout) { @@ -116,7 +125,6 @@ public void setSendTimeout(long sendTimeout) { /** * Specify whether this component should be tracked in the Message History. - * * @param shouldTrack true if the component should be tracked. */ @Override @@ -124,10 +132,31 @@ public void setShouldTrack(boolean shouldTrack) { this.shouldTrack = shouldTrack; } + @Override + public void registerObservationRegistry(ObservationRegistry observationRegistry) { + this.observationRegistry = observationRegistry; + } + + /** + * Set a custom {@link MessageReceiverObservationConvention} for {@link IntegrationObservation#HANDLER}. + * Ignored if an {@link ObservationRegistry} is not configured for this component. + * @param observationConvention the {@link MessageReceiverObservationConvention} to use. + * @since 6.5 + */ + public void setObservationConvention(@Nullable MessageReceiverObservationConvention observationConvention) { + this.observationConvention = observationConvention; + } + + @Override + public boolean isObserved() { + return !ObservationRegistry.NOOP.equals(this.observationRegistry); + } + @Override public String getComponentType() { - return (this.source instanceof NamedComponent) ? - ((NamedComponent) this.source).getComponentType() : "inbound-channel-adapter"; + return (this.source instanceof NamedComponent namedComponent) + ? namedComponent.getComponentType() + : "inbound-channel-adapter"; } @Override @@ -147,8 +176,8 @@ protected final void setReceiveMessageSource(Object source) { @Override protected void doStart() { - if (this.source instanceof Lifecycle) { - ((Lifecycle) this.source).start(); + if (this.source instanceof Lifecycle lifecycle) { + lifecycle.start(); } super.doStart(); @@ -160,8 +189,8 @@ protected void doStart() { @Override protected void doStop() { super.doStop(); - if (this.source instanceof Lifecycle) { - ((Lifecycle) this.source).stop(); + if (this.source instanceof Lifecycle lifecycle) { + lifecycle.stop(); } } @@ -172,8 +201,9 @@ protected void onInit() { || (this.outputChannelName != null && this.outputChannel == null), "One and only one of 'outputChannelName' or 'outputChannel' is required."); super.onInit(); - if (this.getBeanFactory() != null) { - this.messagingTemplate.setBeanFactory(this.getBeanFactory()); + BeanFactory beanFactory = getBeanFactory(); + if (beanFactory != null) { + this.messagingTemplate.setBeanFactory(beanFactory); } } @@ -204,13 +234,13 @@ protected void handleMessage(Message messageArg) { this.messagingTemplate.send(getOutputChannel(), message); AckUtils.autoAck(ackCallback); } - catch (Exception e) { + catch (Exception ex) { AckUtils.autoNack(ackCallback); - if (e instanceof MessagingException) { // NOSONAR - throw (MessagingException) e; + if (ex instanceof MessagingException messagingException) { // NOSONAR + throw messagingException; } else { - throw new MessagingException(message, "Failed to send Message", e); + throw new MessagingException(message, "Failed to send Message", ex); } } } @@ -220,6 +250,41 @@ protected Message receiveMessage() { return this.source.receive(); } + /** + * Start an observation (and open scope) for the received message. + * @param holder the resource holder for this component. + * @param message the received message. + */ + @Override + protected void messageReceived(@Nullable IntegrationResourceHolder holder, Message message) { + Observation observation = + IntegrationObservation.HANDLER.observation(this.observationConvention, + DefaultMessageReceiverObservationConvention.INSTANCE, + () -> new MessageReceiverContext(message, getComponentName(), "message-source"), + this.observationRegistry); + + observation.start().openScope(); + super.messageReceived(holder, message); + } + + /** + * Stop an observation (and close its scope) previously started + * from the {@link #messageReceived(IntegrationResourceHolder, Message)}. + * @param pollingTaskError an optional error as a result of the polling task. + */ + @Override + protected void donePollingTask(@Nullable Exception pollingTaskError) { + Observation.Scope currentObservationScope = this.observationRegistry.getCurrentObservationScope(); + if (currentObservationScope != null) { + currentObservationScope.close(); + Observation currentObservation = currentObservationScope.getCurrentObservation(); + if (pollingTaskError != null) { + currentObservation.error(pollingTaskError); + } + currentObservation.stop(); + } + } + @Override protected Object getResourceToBind() { return this.originalSource; @@ -230,16 +295,16 @@ protected String getResourceKey() { return IntegrationResourceHolder.MESSAGE_SOURCE; } + @Nullable private static Object extractProxyTarget(Object target) { - if (!(target instanceof Advised)) { + if (!(target instanceof Advised advised)) { return target; } - Advised advised = (Advised) target; try { return extractProxyTarget(advised.getTargetSource().getTarget()); } - catch (Exception e) { - throw new BeanCreationException("Could not extract target", e); + catch (Exception ex) { + throw new BeanCreationException("Could not extract target", ex); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/DefaultMessageReceiverObservationConvention.java b/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/DefaultMessageReceiverObservationConvention.java index b0034a98ba8..b6c8a028730 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/DefaultMessageReceiverObservationConvention.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/DefaultMessageReceiverObservationConvention.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,7 +40,7 @@ public KeyValues getLowCardinalityKeyValues(MessageReceiverContext context) { // See IntegrationObservation.HandlerTags.COMPONENT_NAME - to avoid class tangle .of("spring.integration.name", context.getHandlerName()) // See IntegrationObservation.HandlerTags.COMPONENT_TYPE - to avoid class tangle - .and("spring.integration.type", "handler"); + .and("spring.integration.type", context.getHandlerType()); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/MessageReceiverContext.java b/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/MessageReceiverContext.java index c7b26164a4a..551c478d8f1 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/MessageReceiverContext.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/MessageReceiverContext.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2024 the original author or authors. + * Copyright 2022-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,10 +36,24 @@ public class MessageReceiverContext extends ReceiverContext> { private final String handlerName; + private final String handlerType; + public MessageReceiverContext(Message message, @Nullable String handlerName) { + this(message, handlerName, "handler"); + } + + /** + * Construct an instance based on the message, the handler (or source, producer) bean name and handler type. + * @param message the received message for this context. + * @param handlerName the handler (or source, producer) bean name processing the message. + * @param handlerType the handler type: {@code handler}, or {@code message-source}, or {@code message-producer}. + * @since 6.5 + */ + public MessageReceiverContext(Message message, @Nullable String handlerName, String handlerType) { super(MessageReceiverContext::getHeader); this.message = message; this.handlerName = handlerName != null ? handlerName : "unknown"; + this.handlerType = handlerType; } @Override @@ -51,6 +65,10 @@ public String getHandlerName() { return this.handlerName; } + public String getHandlerType() { + return this.handlerType; + } + @Nullable private static String getHeader(Message message, String key) { Object value = message.getHeaders().get(key); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/SourcePollingChannelAdapterObservationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/SourcePollingChannelAdapterObservationTests.java new file mode 100644 index 00000000000..05773c701de --- /dev/null +++ b/spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/SourcePollingChannelAdapterObservationTests.java @@ -0,0 +1,156 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.management.observation; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.test.SampleTestRunner; +import io.micrometer.tracing.test.simple.SpansAssert; + +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.EndpointId; +import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.Poller; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.config.EnableIntegrationManagement; +import org.springframework.integration.scheduling.PollerMetadata; +import org.springframework.integration.test.util.OnlyOnceTrigger; +import org.springframework.integration.transaction.DefaultTransactionSynchronizationFactory; +import org.springframework.integration.transaction.ExpressionEvaluatingTransactionSynchronizationProcessor; +import org.springframework.integration.transaction.PseudoTransactionManager; +import org.springframework.integration.transaction.TransactionInterceptorBuilder; +import org.springframework.integration.transaction.TransactionSynchronizationFactory; +import org.springframework.messaging.MessageChannel; +import org.springframework.transaction.interceptor.TransactionInterceptor; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +/** + * @author Artem Bilan + * + * @since 6.5 + */ +public class SourcePollingChannelAdapterObservationTests extends SampleTestRunner { + + @Override + public TracingSetup[] getTracingSetup() { + return new TracingSetup[] {TracingSetup.IN_MEMORY_BRAVE}; + } + + @Override + public SampleTestRunnerConsumer yourCode() { + return (bb, meterRegistry) -> { + ObservationRegistry observationRegistry = getObservationRegistry(); + + try (AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext()) { + applicationContext.registerBean(ObservationRegistry.class, () -> observationRegistry); + applicationContext.register(ObservationIntegrationTestConfiguration.class); + applicationContext.refresh(); + + var testConfiguration = applicationContext.getBean(ObservationIntegrationTestConfiguration.class); + + assertThat(testConfiguration.transactionLatch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + await().untilAsserted(() -> assertThat(bb.getFinishedSpans()).hasSize(5)); + + SpansAssert.assertThat(bb.getFinishedSpans()) + .haveSameTraceId() + .hasASpanWithName("dataMessageSource receive", spanAssert -> spanAssert + .hasTag(IntegrationObservation.GatewayTags.COMPONENT_TYPE.asString(), "message-source") + .hasKindEqualTo(Span.Kind.CONSUMER)) + .hasASpanWithName("processMessage send", spanAssert -> spanAssert + .hasTag(IntegrationObservation.ProducerTags.COMPONENT_NAME.asString(), "processMessage") + .hasTag(IntegrationObservation.ProducerTags.COMPONENT_TYPE.asString(), "producer") + .hasKindEqualTo(Span.Kind.PRODUCER)) + .hasASpanWithName("dataHandler receive", spanAssert -> spanAssert + .hasTag(IntegrationObservation.HandlerTags.COMPONENT_NAME.asString(), "dataHandler") + .hasTag(IntegrationObservation.HandlerTags.COMPONENT_TYPE.asString(), "handler") + .hasKindEqualTo(Span.Kind.CONSUMER)) + .hasASpanWithName("afterCommit send", spanAssert -> spanAssert + .hasTag(IntegrationObservation.ProducerTags.COMPONENT_NAME.asString(), "afterCommit") + .hasTag(IntegrationObservation.ProducerTags.COMPONENT_TYPE.asString(), "producer") + .hasKindEqualTo(Span.Kind.PRODUCER)) + .hasASpanWithName("commitHandler receive", spanAssert -> spanAssert + .hasTag(IntegrationObservation.HandlerTags.COMPONENT_NAME.asString(), "commitHandler") + .hasTag(IntegrationObservation.HandlerTags.COMPONENT_TYPE.asString(), "handler") + .hasKindEqualTo(Span.Kind.CONSUMER)); + }; + } + + @Configuration + @EnableIntegration + @EnableIntegrationManagement(observationPatterns = "*") + public static class ObservationIntegrationTestConfiguration { + + final CountDownLatch transactionLatch = new CountDownLatch(1); + + @Bean + MessageChannel afterCommit() { + return new DirectChannel(); + } + + @Bean + TransactionSynchronizationFactory testTransactionSynchronizationFactory(MessageChannel afterCommit) { + var processor = new ExpressionEvaluatingTransactionSynchronizationProcessor(); + processor.setAfterCommitChannel(afterCommit); + return new DefaultTransactionSynchronizationFactory(processor); + } + + @Bean + PollerMetadata pollerMetadata(TransactionSynchronizationFactory testTransactionSynchronizationFactory) { + PollerMetadata pollerMetadata = new PollerMetadata(); + pollerMetadata.setTrigger(new OnlyOnceTrigger()); + TransactionInterceptor transactionInterceptor = + new TransactionInterceptorBuilder() + .transactionManager(new PseudoTransactionManager()) + .build(); + pollerMetadata.setAdviceChain(List.of(transactionInterceptor)); + pollerMetadata.setTransactionSynchronizationFactory(testTransactionSynchronizationFactory); + return pollerMetadata; + } + + @EndpointId("dataMessageSource") + @InboundChannelAdapter(channel = "processMessage", poller = @Poller("pollerMetadata")) + String emitData() { + return "some data"; + } + + @EndpointId("dataHandler") + @ServiceActivator(inputChannel = "processMessage") + void processData(String data) { + + } + + @EndpointId("commitHandler") + @ServiceActivator(inputChannel = "afterCommit") + void afterCommit(String data) { + transactionLatch.countDown(); + } + + } + +} diff --git a/src/reference/antora/modules/ROOT/pages/metrics.adoc b/src/reference/antora/modules/ROOT/pages/metrics.adoc index 4f0fe2be96d..e29af1cca29 100644 --- a/src/reference/antora/modules/ROOT/pages/metrics.adoc +++ b/src/reference/antora/modules/ROOT/pages/metrics.adoc @@ -164,14 +164,15 @@ The meters are not gathered in this case independently, but delegated to an appr The following Spring Integration components are instrumented with observation logic each with a respective convention: -* `MessageProducerSupport`, being the inbound endpoint of the flow, is considered as a `CONSUMER` span type and uses the `IntegrationObservation.HANDLER` API; -* MessagingGatewaySupport` is an inbound request-reply endpoint, and is considered as a `SERVER` span type. +* `MessageProducerSupport`, being an inbound endpoint of the flow, is considered as a `CONSUMER` span type and uses the `IntegrationObservation.HANDLER` API; +* `MessagingGatewaySupport` is an inbound request-reply endpoint, and is considered as a `SERVER` span type. It uses the `IntegrationObservation.GATEWAY` API; * An `AbstractMessageChannel.send()` operation is the only Spring Integration API where it produces messages. So, it is treated as a `PRODUCER` span type and uses the `IntegrationObservation.PRODCUER` API. This makes more sense when a channel is a distributed implementation (e.g. `PublishSubscribeKafkaChannel` or `ZeroMqChannel`) and trace information has to be added to the message. So, the `IntegrationObservation.PRODUCER` observation is based on a `MessageSenderContext` where Spring Integration supplies a `MutableMessage` to allow a subsequent tracing `Propagator` to add headers, so they are available to the consumer; -* An `AbstractMessageHandler` is a `CONSUMER` span type and uses the `IntegrationObservation.HANDLER` API. +* An `AbstractMessageHandler` is a `CONSUMER` span type and uses the `IntegrationObservation.HANDLER` API; +* The `SourcePollingChannelAdapter` (starting with version 6.5), being an inbound endpoint of the flow, is considered as a `CONSUMER` span type and uses the `IntegrationObservation.HANDLER` API. An observation production on the `IntegrationManagement` components can be customized via `ObservationConvention` configuration. For example an `AbstractMessageHandler` expects a `MessageReceiverObservationConvention` via its `setObservationConvention()` API. diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 5216a7a6abb..2f44836489e 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -31,4 +31,11 @@ See xref:aggregator.adoc[Aggregator] for more information. == The `LockRegistry` in the `MessageStore` The `AbstractMessageGroupStore` now can be configured with a `LockRegistry` to perform series of persistent operation atomically. -See xref:message-store.adoc#use-lock-registry[Use LockRegistry] for more information. \ No newline at end of file +See xref:message-store.adoc#use-lock-registry[Use LockRegistry] for more information. + +[[x6.4-observation-changes]] +== Micrometer Observation Changes + +The `SourcePollingChannelAdapter` endpoint now starts a `CONSUMER` kind observation for the received message. +The `MessageReceiverContext` now distinguishes between `handler`, `message-source` and `message-producer` values for the `spring.integration.type` low cardinality tag. +See xref:metrics.adoc#micrometer-observation[Micrometer Observation] for more information. \ No newline at end of file