Skip to content

Commit

Permalink
Wait for rocketmq message to arrive before asserting spans (#5591)
Browse files Browse the repository at this point in the history
* Wait for rocketmq message to arrive before asserting spans

* Apply suggestions from code review

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>

* trigger build

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
  • Loading branch information
laurit and trask authored Mar 17, 2022
1 parent 1ee60aa commit 6497794
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ package io.opentelemetry.instrumentation.rocketmq
import base.BaseConf
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.client.producer.SendCallback
import org.apache.rocketmq.client.producer.SendResult
import org.apache.rocketmq.client.producer.SendStatus
import org.apache.rocketmq.common.message.Message
import org.apache.rocketmq.remoting.common.RemotingHelper
import spock.lang.Shared
Expand Down Expand Up @@ -66,17 +69,27 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
BaseConf.deleteTempDir()
}

def setup() {
tracingMessageListener.reset()
}

def "test rocketmq produce callback"() {
CompletableFuture<SendResult> result = new CompletableFuture<>()
when:
producer.send(msg, new SendCallback() {
@Override
void onSuccess(SendResult sendResult) {
result.complete(sendResult)
}

@Override
void onException(Throwable throwable) {
result.completeExceptionally(throwable)
}
})
result.get(10, TimeUnit.SECONDS).sendStatus == SendStatus.SEND_OK
// waiting longer than assertTraces below does on its own because of CI flakiness
tracingMessageListener.waitForMessages()

then:
assertTraces(1) {
Expand Down Expand Up @@ -123,8 +136,11 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
def "test rocketmq produce and consume"() {
when:
runWithSpan("parent") {
producer.send(msg)
SendResult sendResult = producer.send(msg)
assert sendResult.sendStatus == SendStatus.SEND_OK
}
// waiting longer than assertTraces below does on its own because of CI flakiness
tracingMessageListener.waitForMessages()

then:
assertTraces(1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.instrumentation.rocketmq

import java.util.concurrent.TimeUnit
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly
Expand Down Expand Up @@ -33,7 +34,7 @@ class TracingMessageListener implements MessageListenerOrderly {
}

void waitForMessages() {
messageReceived.await()
messageReceived.await(30, TimeUnit.SECONDS)
}

int getLastBatchSize() {
Expand Down

0 comments on commit 6497794

Please sign in to comment.