Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messaging convention reviewed #1297

Merged
merged 18 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/semantic-conventions.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,22 @@ not values defined by spec.
| `db.hbase` | Y | -, HBase is not supported |
| `db.redis.database_index` | N | only set for Lettuce driver, not for Jedis |
| `db.mongodb.collection` | Y | - |

## Messaging

Attribute name | Required? | Implemented? |
| -------------- | :-----: | :---: |
| `messaging.system` | Y | + |
| `messaging.destination` | Y | + |
| `messaging.destination_kind` | Y | + |
| `messaging.temp_destination` | N | - |
| `messaging.protocol` | N | - |
| `messaging.protocol_version` | N | - |
| `messaging.url` | N | - |
| `messaging.message_id` | N | only for JMS |
| `messaging.conversation_id` | N | only for JMS |
| `messaging.message_payload_size_bytes` | N | only for RabbitMQ and Kafka [1] |
| `messaging.message_payload_compressed_size_bytes` | N | - |
| `messaging.operation` | for consumers only | +

**[1]:** Kafka consumer instrumentation sets this to the serialized size of the value
76 changes: 42 additions & 34 deletions instrumentation/jms-1.1/src/jms2Test/groovy/JMS2Test.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
* SPDX-License-Identifier: Apache-2.0
*/

import static io.opentelemetry.trace.Span.Kind.CLIENT
import static io.opentelemetry.trace.Span.Kind.CONSUMER
import static io.opentelemetry.trace.Span.Kind.PRODUCER

import com.google.common.io.Files
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.auto.test.asserts.TraceAssert
import io.opentelemetry.instrumentation.auto.jms.JMSTracer
import io.opentelemetry.instrumentation.auto.jms.Operation
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.trace.attributes.SemanticAttributes
import java.util.concurrent.CountDownLatch
Expand All @@ -29,7 +30,6 @@ import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory
import org.hornetq.core.server.HornetQServer
import org.hornetq.core.server.HornetQServers
import org.hornetq.jms.client.HornetQMessageConsumer
import org.hornetq.jms.client.HornetQTextMessage
import spock.lang.Shared

Expand Down Expand Up @@ -99,7 +99,7 @@ class JMS2Test extends AgentTestRunner {
producerSpan(it, 0, destinationType, destinationName)
}
trace(1, 1) {
consumerSpan(it, 0, destinationType, destinationName, messageId, false, HornetQMessageConsumer, traces[0][0])
consumerSpan(it, 0, destinationType, destinationName, messageId, null, Operation.receive)
}
}

Expand All @@ -111,8 +111,8 @@ class JMS2Test extends AgentTestRunner {
destination | destinationType | destinationName
session.createQueue("someQueue") | "queue" | "someQueue"
session.createTopic("someTopic") | "topic" | "someTopic"
session.createTemporaryQueue() | "queue" | "<temporary>"
session.createTemporaryTopic() | "topic" | "<temporary>"
session.createTemporaryQueue() | "queue" | JMSTracer.TEMP_DESTINATION_NAME
session.createTemporaryTopic() | "topic" | JMSTracer.TEMP_DESTINATION_NAME
}

def "sending to a MessageListener on #destinationName #destinationType generates a span"() {
Expand All @@ -136,7 +136,7 @@ class JMS2Test extends AgentTestRunner {
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), true, consumer.messageListener.class, span(0))
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), Operation.process)
}
}
// This check needs to go after all traces have been accounted for
Expand All @@ -150,8 +150,8 @@ class JMS2Test extends AgentTestRunner {
destination | destinationType | destinationName
session.createQueue("someQueue") | "queue" | "someQueue"
session.createTopic("someTopic") | "topic" | "someTopic"
session.createTemporaryQueue() | "queue" | "<temporary>"
session.createTemporaryTopic() | "topic" | "<temporary>"
session.createTemporaryQueue() | "queue" | JMSTracer.TEMP_DESTINATION_NAME
session.createTemporaryTopic() | "topic" | JMSTracer.TEMP_DESTINATION_NAME
}

def "failing to receive message with receiveNoWait on #destinationName #destinationType works"() {
Expand All @@ -167,12 +167,14 @@ class JMS2Test extends AgentTestRunner {
trace(0, 1) { // Consumer trace
span(0) {
hasNoParent()
name destinationType + "/" + destinationName + " receive"
kind CLIENT
name destinationName + " receive"
kind CONSUMER
errored false
attributes {
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType
"${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms"
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType
"${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName
"${SemanticAttributes.MESSAGING_OPERATION.key}" Operation.receive.name()
}
}
}
Expand Down Expand Up @@ -200,12 +202,15 @@ class JMS2Test extends AgentTestRunner {
trace(0, 1) { // Consumer trace
span(0) {
hasNoParent()
name destinationType + "/" + destinationName + " receive"
kind CLIENT
name destinationName + " receive"
kind CONSUMER
errored false
attributes {
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType
"${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms"
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType
"${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName
"${SemanticAttributes.MESSAGING_OPERATION.key}" Operation.receive.name()

}
}
}
Expand All @@ -222,42 +227,45 @@ class JMS2Test extends AgentTestRunner {

static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
trace.span(index) {
name destinationType + "/" + destinationName + " send"
name destinationName + " send"
kind PRODUCER
errored false
hasNoParent()
attributes {
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType
"${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName
if (destinationName == "<temporary>") {
"${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key()}" true
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType
if (destinationName == JMSTracer.TEMP_DESTINATION_NAME) {
"${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key}" true
}
}
}
}

static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, boolean messageListener, Class origin, Object parentOrLinkedSpan) {
// passing messageId = null will verify message.id is not captured,
// passing messageId = "" will verify message.id is captured (but won't verify anything about the value),
// any other value for messageId will verify that message.id is captured and has that same value
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, Operation operation) {
iNikem marked this conversation as resolved.
Show resolved Hide resolved
trace.span(index) {
name destinationType + "/" + destinationName + " receive"
if (messageListener) {
kind CONSUMER
name destinationName + " " + operation.name()
kind CONSUMER
if (parentOrLinkedSpan != null) {
childOf((SpanData) parentOrLinkedSpan)
} else {
kind CLIENT
hasNoParent()
hasLink((SpanData) parentOrLinkedSpan)
}
errored false
attributes {
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType
"${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType
"${SemanticAttributes.MESSAGING_OPERATION.key}" operation.name()
if (messageId != null) {
"${SemanticAttributes.MESSAGING_MESSAGE_ID.key()}" messageId
} else {
"${SemanticAttributes.MESSAGING_MESSAGE_ID.key()}" String
//In some tests we don't know exact messageId, so we pass "" and verify just the existence of the attribute
"${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" { it == messageId || messageId == "" }
}
if (destinationName == "<temporary>") {
"${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key()}" true
if (destinationName == JMSTracer.TEMP_DESTINATION_NAME) {
"${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key}" true
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ import static JMS2Test.consumerSpan
import static JMS2Test.producerSpan

import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.instrumentation.auto.jms.Operation
import javax.jms.ConnectionFactory
import listener.Config
import org.hornetq.jms.client.HornetQMessageConsumer
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.jms.core.JmsTemplate
import org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter

class SpringListenerJMS2Test extends AgentTestRunner {
def "receiving message in spring listener generates spans"() {
Expand All @@ -26,10 +25,10 @@ class SpringListenerJMS2Test extends AgentTestRunner {
assertTraces(2) {
trace(0, 2) {
producerSpan(it, 0, "queue", "SpringListenerJMS2")
consumerSpan(it, 1, "queue", "SpringListenerJMS2", null, true, MessagingMessageListenerAdapter, span(0))
consumerSpan(it, 1, "queue", "SpringListenerJMS2", "", span(0), Operation.process)
}
trace(1, 1) {
consumerSpan(it, 0, "queue", "SpringListenerJMS2", null, false, HornetQMessageConsumer, traces[0][0])
consumerSpan(it, 0, "queue", "SpringListenerJMS2", "", null, Operation.receive)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import static JMS2Test.producerSpan

import com.google.common.io.Files
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.instrumentation.auto.jms.Operation
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import javax.jms.Session
Expand All @@ -23,7 +24,6 @@ import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory
import org.hornetq.core.server.HornetQServer
import org.hornetq.core.server.HornetQServers
import org.hornetq.jms.client.HornetQMessageConsumer
import org.springframework.jms.core.JmsTemplate
import spock.lang.Shared

Expand Down Expand Up @@ -90,7 +90,7 @@ class SpringTemplateJMS2Test extends AgentTestRunner {
producerSpan(it, 0, destinationType, destinationName)
}
trace(1, 1) {
consumerSpan(it, 0, destinationType, destinationName, receivedMessage.getJMSMessageID(), false, HornetQMessageConsumer, traces[0][0])
consumerSpan(it, 0, destinationType, destinationName, receivedMessage.getJMSMessageID(), null, Operation.receive)
}
}

Expand Down Expand Up @@ -123,14 +123,13 @@ class SpringTemplateJMS2Test extends AgentTestRunner {
producerSpan(it, 0, destinationType, destinationName)
}
trace(1, 1) {
consumerSpan(it, 0, destinationType, destinationName, msgId.get(), false, HornetQMessageConsumer, traces[0][0])
consumerSpan(it, 0, destinationType, destinationName, msgId.get(), null, Operation.receive)
}
trace(2, 1) {
// receive doesn't propagate the trace, so this is a root
producerSpan(it, 0, "queue", "<temporary>")
producerSpan(it, 0, "queue", "(temporary)")
}
trace(3, 1) {
consumerSpan(it, 0, "queue", "<temporary>", receivedMessage.getJMSMessageID(), false, HornetQMessageConsumer, traces[2][0])
consumerSpan(it, 0, "queue", "(temporary)", receivedMessage.getJMSMessageID(), null, Operation.receive)
}
}

Expand Down

This file was deleted.

Loading