From 04b9b64e53035403ad4809e8ab9e365efb75dbca Mon Sep 17 00:00:00 2001 From: Jakub Wach Date: Thu, 11 Mar 2021 12:46:33 +0100 Subject: [PATCH] AWS SDK - S3 - SNS - SQS propagation tests (#2549) * AWS SDK - S3 - SNS - SQS propagation tests * code review --- .../src/test/groovy/AwsConnector.groovy | 188 ++++++ .../src/test/groovy/S3TracingTest.groovy | 541 ++++++++++++++---- .../src/test/groovy/SnsTracingTest.groovy | 104 +--- 3 files changed, 627 insertions(+), 206 deletions(-) create mode 100644 instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/AwsConnector.groovy diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/AwsConnector.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/AwsConnector.groovy new file mode 100644 index 000000000000..b04b85b98b67 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/AwsConnector.groovy @@ -0,0 +1,188 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import com.amazonaws.regions.Regions +import com.amazonaws.services.s3.AmazonS3Client +import com.amazonaws.services.s3.model.BucketNotificationConfiguration +import com.amazonaws.services.s3.model.ObjectListing +import com.amazonaws.services.s3.model.QueueConfiguration +import com.amazonaws.services.s3.model.S3Event +import com.amazonaws.services.s3.model.S3ObjectSummary +import com.amazonaws.services.s3.model.SetBucketNotificationConfigurationRequest +import com.amazonaws.services.s3.model.TopicConfiguration +import com.amazonaws.services.sns.AmazonSNSAsyncClient +import com.amazonaws.services.sns.model.CreateTopicResult +import com.amazonaws.services.sns.model.SetTopicAttributesRequest +import com.amazonaws.services.sqs.AmazonSQSAsyncClient +import com.amazonaws.services.sqs.model.GetQueueAttributesRequest +import com.amazonaws.services.sqs.model.PurgeQueueRequest +import com.amazonaws.services.sqs.model.ReceiveMessageRequest +import org.slf4j.LoggerFactory +import org.testcontainers.containers.localstack.LocalStackContainer +import org.testcontainers.containers.output.Slf4jLogConsumer +import org.testcontainers.utility.DockerImageName + +class AwsConnector { + + private LocalStackContainer localstack + + private AmazonSQSAsyncClient sqsClient + private AmazonS3Client s3Client + private AmazonSNSAsyncClient snsClient + + static localstack() { + AwsConnector awsConnector = new AwsConnector() + + awsConnector.localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:latest")) + .withServices(LocalStackContainer.Service.SQS, LocalStackContainer.Service.SNS) + .withEnv("DEBUG", "1") + .withEnv("SQS_PROVIDER", "elasticmq") + awsConnector.localstack.start() + awsConnector.localstack.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("test"))) + + awsConnector.sqsClient = AmazonSQSAsyncClient.asyncBuilder() + .withEndpointConfiguration(awsConnector.localstack.getEndpointConfiguration(LocalStackContainer.Service.SQS)) + .withCredentials(awsConnector.localstack.getDefaultCredentialsProvider()) + .build() + + awsConnector.s3Client = AmazonS3Client.builder() + .withEndpointConfiguration(awsConnector.localstack.getEndpointConfiguration(LocalStackContainer.Service.S3)) + .withCredentials(awsConnector.localstack.getDefaultCredentialsProvider()) + .build() + + awsConnector.snsClient = AmazonSNSAsyncClient.asyncBuilder() + .withEndpointConfiguration(awsConnector.localstack.getEndpointConfiguration(LocalStackContainer.Service.SNS)) + .withCredentials(awsConnector.localstack.getDefaultCredentialsProvider()) + .build() + + return awsConnector + } + + static liveAws() { + AwsConnector awsConnector = new AwsConnector() + + awsConnector.sqsClient = AmazonSQSAsyncClient.asyncBuilder() + .withRegion(Regions.US_EAST_1) + .build() + + awsConnector.s3Client = AmazonS3Client.builder() + .withRegion(Regions.US_EAST_1) + .build() + + awsConnector.snsClient = AmazonSNSAsyncClient.asyncBuilder() + .withRegion(Regions.US_EAST_1) + .build() + + return awsConnector + } + + def createQueue(String queueName) { + println "Create queue ${queueName}" + return sqsClient.createQueue(queueName).getQueueUrl() + } + + def getQueueArn(String queueUrl) { + println "Get ARN for queue ${queueUrl}" + return sqsClient.getQueueAttributes( + new GetQueueAttributesRequest(queueUrl) + .withAttributeNames("QueueArn")).getAttributes() + .get("QueueArn") + } + + def setTopicPublishingPolicy(String topicArn) { + println "Set policy for topic ${topicArn}" + snsClient.setTopicAttributes(new SetTopicAttributesRequest(topicArn, "Policy", String.format(SNS_POLICY, topicArn))) + } + + private static final String SNS_POLICY = "{" + + " \"Statement\": [" + + " {" + + " \"Effect\": \"Allow\"," + + " \"Principal\": \"*\"," + + " \"Action\": \"sns:Publish\"," + + " \"Resource\": \"%s\"" + + " }]" + + "}" + + def setQueuePublishingPolicy(String queueUrl, String queueArn) { + println "Set policy for queue ${queueArn}" + sqsClient.setQueueAttributes(queueUrl, Collections.singletonMap("Policy", String.format(SQS_POLICY, queueArn))) + } + + private static final String SQS_POLICY = "{" + + " \"Statement\": [" + + " {" + + " \"Effect\": \"Allow\"," + + " \"Principal\": \"*\"," + + " \"Action\": \"sqs:SendMessage\"," + + " \"Resource\": \"%s\"" + + " }]" + + "}" + + def createBucket(String bucketName) { + println "Create bucket ${bucketName}" + s3Client.createBucket(bucketName) + } + + def deleteBucket(String bucketName) { + println "Delete bucket ${bucketName}" + ObjectListing objectListing = s3Client.listObjects(bucketName) + Iterator objIter = objectListing.getObjectSummaries().iterator() + while (objIter.hasNext()) { + s3Client.deleteObject(bucketName, objIter.next().getKey()) + } + s3Client.deleteBucket(bucketName) + } + + def enableS3ToSqsNotifications(String bucketName, String sqsQueueArn) { + println "Enable notification for bucket ${bucketName} to queue ${sqsQueueArn}" + BucketNotificationConfiguration notificationConfiguration = new BucketNotificationConfiguration() + notificationConfiguration.addConfiguration("sqsQueueConfig", + new QueueConfiguration(sqsQueueArn, EnumSet.of(S3Event.ObjectCreatedByPut))) + s3Client.setBucketNotificationConfiguration(new SetBucketNotificationConfigurationRequest( + bucketName, notificationConfiguration)) + } + + def enableS3ToSnsNotifications(String bucketName, String snsTopicArn) { + println "Enable notification for bucket ${bucketName} to topic ${snsTopicArn}" + BucketNotificationConfiguration notificationConfiguration = new BucketNotificationConfiguration() + notificationConfiguration.addConfiguration("snsTopicConfig", + new TopicConfiguration(snsTopicArn, EnumSet.of(S3Event.ObjectCreatedByPut))) + s3Client.setBucketNotificationConfiguration(new SetBucketNotificationConfigurationRequest( + bucketName, notificationConfiguration)) + } + + def createTopicAndSubscribeQueue(String topicName, String queueArn) { + println "Create topic ${topicName} and subscribe to queue ${queueArn}" + CreateTopicResult ctr = snsClient.createTopic(topicName) + snsClient.subscribe(ctr.getTopicArn(), "sqs", queueArn) + return ctr.getTopicArn() + } + + def receiveMessage(String queueUrl) { + println "Receive message from queue ${queueUrl}" + sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(20)) + } + + def purgeQueue(String queueUrl) { + println "Purge queue ${queueUrl}" + sqsClient.purgeQueue(new PurgeQueueRequest(queueUrl)) + } + + def putSampleData(String bucketName) { + println "Put sample data to bucket ${bucketName}" + s3Client.putObject(bucketName, "otelTestKey", "otelTestData") + } + + def publishSampleNotification(String topicArn) { + snsClient.publish(topicArn, "Hello There") + } + + def disconnect() { + if (localstack != null) { + localstack.stop() + } + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy index 35a0efeca805..93c4557dbdd1 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/S3TracingTest.groovy @@ -6,120 +6,42 @@ import static io.opentelemetry.api.trace.SpanKind.CLIENT import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import com.amazonaws.services.s3.AmazonS3Client -import com.amazonaws.services.s3.model.BucketNotificationConfiguration -import com.amazonaws.services.s3.model.ObjectListing -import com.amazonaws.services.s3.model.QueueConfiguration -import com.amazonaws.services.s3.model.S3Event -import com.amazonaws.services.s3.model.S3ObjectSummary -import com.amazonaws.services.s3.model.SetBucketNotificationConfigurationRequest -import com.amazonaws.services.sqs.AmazonSQSAsyncClient -import com.amazonaws.services.sqs.model.GetQueueAttributesRequest -import com.amazonaws.services.sqs.model.PurgeQueueRequest -import com.amazonaws.services.sqs.model.ReceiveMessageRequest import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import org.slf4j.LoggerFactory -import org.testcontainers.containers.localstack.LocalStackContainer -import org.testcontainers.containers.output.Slf4jLogConsumer -import org.testcontainers.utility.DockerImageName import spock.lang.Ignore import spock.lang.Shared +@Ignore("Requires https://github.com/localstack/localstack/issues/3686 and #3669") + class S3TracingTest extends AgentInstrumentationSpecification { @Shared - LocalStackContainer localstack - @Shared - AmazonSQSAsyncClient sqsClient - @Shared - AmazonS3Client s3Client - - def setupSpec() { - - localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:latest")) - .withServices(LocalStackContainer.Service.SQS, LocalStackContainer.Service.SNS) - .withEnv("DEBUG", "1") - .withEnv("SQS_PROVIDER", "elasticmq") - localstack.start() - - sqsClient = AmazonSQSAsyncClient.asyncBuilder() - .withEndpointConfiguration(localstack.getEndpointConfiguration(LocalStackContainer.Service.SQS)) - .withCredentials(localstack.getDefaultCredentialsProvider()) - //.withRegion(Regions.US_EAST_1) - .build() - - s3Client = AmazonS3Client.builder() - .withEndpointConfiguration(localstack.getEndpointConfiguration(LocalStackContainer.Service.SNS)) - .withCredentials(localstack.getDefaultCredentialsProvider()) - //.withRegion(Regions.US_EAST_1) - .build() - - localstack.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("test"))) - } + AwsConnector awsConnector = AwsConnector.localstack() def cleanupSpec() { - if (localstack != null) { - localstack.stop() - } - } - - def createQueue(String queueName) { - return sqsClient.createQueue(queueName).getQueueUrl() - } - - def getQueueArn(String queueUrl) { - return sqsClient.getQueueAttributes( - new GetQueueAttributesRequest(queueUrl) - .withAttributeNames("QueueArn")).getAttributes() - .get("QueueArn") - } - - def setQueuePolicy(String queueUrl, String queueArn) { - sqsClient.setQueueAttributes(queueUrl, Collections.singletonMap("Policy", policy(queueArn))) + awsConnector.disconnect() } - def createBucket(String bucketName) { - s3Client.createBucket(bucketName) - } - - def deleteBucket(String bucketName) { - ObjectListing objectListing = s3Client.listObjects(bucketName) - Iterator objIter = objectListing.getObjectSummaries().iterator() - while (objIter.hasNext()) { - s3Client.deleteObject(bucketName, objIter.next().getKey()) - } - s3Client.deleteBucket(bucketName) - } - - def enableS3Notifications(String bucketName, String sqsQueueArn) { - BucketNotificationConfiguration notificationConfiguration = new BucketNotificationConfiguration() - notificationConfiguration.addConfiguration("sqsQueueConfig", - new QueueConfiguration(sqsQueueArn, EnumSet.of(S3Event.ObjectCreatedByPut))) - s3Client.setBucketNotificationConfiguration(new SetBucketNotificationConfigurationRequest( - bucketName, notificationConfiguration)) - } - - @Ignore("Requires https://github.com/localstack/localstack/issues/3686 to work with localstack") - def "simple S3 upload as producer - SQS consumer services"() { + def "S3 upload triggers SQS message"() { setup: String queueName = "s3ToSqsTestQueue" - String bucketName = "s3-sqs-test-bucket" + String bucketName = "otel-s3-to-sqs-test-bucket" - String queueUrl = createQueue(queueName) - String queueArn = getQueueArn(queueUrl) - setQueuePolicy(queueUrl, queueArn) - createBucket(bucketName) - enableS3Notifications(bucketName, queueArn) + String queueUrl = awsConnector.createQueue(queueName) + awsConnector.createBucket(bucketName) + + String queueArn = awsConnector.getQueueArn(queueUrl) + awsConnector.setQueuePublishingPolicy(queueUrl, queueArn) + awsConnector.enableS3ToSqsNotifications(bucketName, queueArn) when: // test message, auto created by AWS - sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(20)) - s3Client.putObject(bucketName, "testKey", "testData") + awsConnector.receiveMessage(queueUrl) + awsConnector.putSampleData(bucketName) // traced message - sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(20)) + awsConnector.receiveMessage(queueUrl) // cleanup - deleteBucket(bucketName) - sqsClient.purgeQueue(new PurgeQueueRequest(queueUrl)) + awsConnector.deleteBucket(bucketName) + awsConnector.purgeQueue(queueUrl) then: assertTraces(13) { @@ -148,17 +70,17 @@ class S3TracingTest extends AgentInstrumentationSpecification { trace(1, 1) { span(0) { - name "SQS.GetQueueAttributes" + name "S3.CreateBucket" kind CLIENT hasNoParent() attributes { "aws.agent" "java-aws-sdk" "aws.endpoint" String - "aws.operation" "GetQueueAttributes" - "aws.queue.url" queueUrl - "aws.service" "AmazonSQS" + "aws.operation" "CreateBucket" + "aws.service" "Amazon S3" + "aws.bucket.name" bucketName "http.flavor" "1.1" - "http.method" "POST" + "http.method" "PUT" "http.status_code" 200 "http.url" String "net.peer.name" String @@ -170,13 +92,13 @@ class S3TracingTest extends AgentInstrumentationSpecification { trace(2, 1) { span(0) { - name "SQS.SetQueueAttributes" + name "SQS.GetQueueAttributes" kind CLIENT hasNoParent() attributes { "aws.agent" "java-aws-sdk" "aws.endpoint" String - "aws.operation" "SetQueueAttributes" + "aws.operation" "GetQueueAttributes" "aws.queue.url" queueUrl "aws.service" "AmazonSQS" "http.flavor" "1.1" @@ -192,17 +114,17 @@ class S3TracingTest extends AgentInstrumentationSpecification { trace(3, 1) { span(0) { - name "S3.CreateBucket" + name "SQS.SetQueueAttributes" kind CLIENT hasNoParent() attributes { "aws.agent" "java-aws-sdk" "aws.endpoint" String - "aws.operation" "CreateBucket" - "aws.service" "Amazon S3" - "aws.bucket.name" bucketName + "aws.operation" "SetQueueAttributes" + "aws.queue.url" queueUrl + "aws.service" "AmazonSQS" "http.flavor" "1.1" - "http.method" "PUT" + "http.method" "POST" "http.status_code" 200 "http.url" String "net.peer.name" String @@ -222,6 +144,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { "aws.endpoint" String "aws.operation" "SetBucketNotificationConfiguration" "aws.service" "Amazon S3" + "aws.bucket.name" bucketName "http.flavor" "1.1" "http.method" "PUT" "http.status_code" 200 @@ -289,6 +212,7 @@ class S3TracingTest extends AgentInstrumentationSpecification { "aws.endpoint" String "aws.operation" "PutObject" "aws.service" "Amazon S3" + "aws.bucket.name" bucketName "http.flavor" "1.1" "http.method" "PUT" "http.status_code" 200 @@ -431,17 +355,396 @@ class S3TracingTest extends AgentInstrumentationSpecification { } } - def policy(String queueArn) { - return String.format(SQS_POLICY, queueArn) - } + def "S3 upload triggers SNS topic notification, then creates SQS message"() { + setup: + String queueName = "s3ToSnsToSqsTestQueue" + String bucketName = "otel-s3-sns-sqs-test-bucket" + String topicName = "s3ToSnsToSqsTestTopic" - private static final String SQS_POLICY = "{" + - " \"Statement\": [" + - " {" + - " \"Effect\": \"Allow\"," + - " \"Principal\": \"*\"," + - " \"Action\": \"sqs:SendMessage\"," + - " \"Resource\": \"%s\"" + - " }]" + - "}" + String queueUrl = awsConnector.createQueue(queueName) + String queueArn = awsConnector.getQueueArn(queueUrl) + awsConnector.createBucket(bucketName) + String topicArn = awsConnector.createTopicAndSubscribeQueue(topicName, queueArn) + + awsConnector.setQueuePublishingPolicy(queueUrl, queueArn) + awsConnector.setTopicPublishingPolicy(topicArn) + awsConnector.enableS3ToSnsNotifications(bucketName, topicArn) + + when: + // test message, auto created by AWS + awsConnector.receiveMessage(queueUrl) + awsConnector.putSampleData(bucketName) + // traced message + awsConnector.receiveMessage(queueUrl) + // cleanup + awsConnector.deleteBucket(bucketName) + awsConnector.purgeQueue(queueUrl) + + then: + assertTraces(16) { + trace(0, 1) { + span(0) { + name "SQS.CreateQueue" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "CreateQueue" + "aws.queue.name" queueName + "aws.service" "AmazonSQS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + trace(1, 1) { + span(0) { + name "SQS.GetQueueAttributes" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "GetQueueAttributes" + "aws.queue.url" queueUrl + "aws.service" "AmazonSQS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + trace(2, 1) { + span(0) { + name "S3.CreateBucket" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "CreateBucket" + "aws.service" "Amazon S3" + "aws.bucket.name" bucketName + "http.flavor" "1.1" + "http.method" "PUT" + "http.status_code" 200 + "http.url" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + trace(3, 1) { + span(0) { + name "SNS.CreateTopic" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "CreateTopic" + "aws.service" "AmazonSNS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + trace(4, 1) { + span(0) { + name "SNS.Subscribe" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "Subscribe" + "aws.service" "AmazonSNS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + trace(5, 1) { + span(0) { + name "SQS.SetQueueAttributes" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "SetQueueAttributes" + "aws.queue.url" queueUrl + "aws.service" "AmazonSQS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + trace(6, 1) { + span(0) { + name "SNS.SetTopicAttributes" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "SetTopicAttributes" + "aws.service" "AmazonSNS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + trace(7, 1) { + span(0) { + name "S3.SetBucketNotificationConfiguration" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "SetBucketNotificationConfiguration" + "aws.service" "Amazon S3" + "aws.bucket.name" bucketName + "http.flavor" "1.1" + "http.method" "PUT" + "http.status_code" 200 + "http.url" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + // test even receive + trace(8, 1) { + span(0) { + name "SQS.ReceiveMessage" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "ReceiveMessage" + "aws.queue.url" queueUrl + "aws.service" "AmazonSQS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + /** + * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message). + * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear + */ + trace(9, 1) { + span(0) { + name "SQS.ReceiveMessage" + kind CONSUMER + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "ReceiveMessage" + "aws.queue.url" queueUrl + "aws.service" "AmazonSQS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" String + "http.user_agent" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + trace(10, 2) { + span(0) { + name "S3.PutObject" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "PutObject" + "aws.service" "Amazon S3" + "aws.bucket.name" bucketName + "http.flavor" "1.1" + "http.method" "PUT" + "http.status_code" 200 + "http.url" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + span(1) { + name "SQS.ReceiveMessage" + kind CONSUMER + childOf span(0) + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "ReceiveMessage" + "aws.queue.url" queueUrl + "aws.service" "AmazonSQS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" String + "http.user_agent" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + /** + * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message). + * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear + */ + trace(11, 1) { + span(0) { + name "SQS.ReceiveMessage" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "ReceiveMessage" + "aws.queue.url" queueUrl + "aws.service" "AmazonSQS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + trace(12, 1) { + span(0) { + name "S3.ListObjects" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "ListObjects" + "aws.service" "Amazon S3" + "aws.bucket.name" bucketName + "http.flavor" "1.1" + "http.method" "GET" + "http.status_code" 200 + "http.url" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + trace(13, 1) { + span(0) { + name "S3.DeleteObject" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "DeleteObject" + "aws.service" "Amazon S3" + "aws.bucket.name" bucketName + "http.flavor" "1.1" + "http.method" "DELETE" + "http.status_code" 204 + "http.url" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + trace(14, 1) { + span(0) { + name "S3.DeleteBucket" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "DeleteBucket" + "aws.service" "Amazon S3" + "aws.bucket.name" bucketName + "http.flavor" "1.1" + "http.method" "DELETE" + "http.status_code" 204 + "http.url" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + trace(15, 1) { + span(0) { + name "SQS.PurgeQueue" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" String + "aws.operation" "PurgeQueue" + "aws.queue.url" queueUrl + "aws.service" "AmazonSQS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" String + "net.peer.name" String + "net.transport" "IP.TCP" + "net.peer.port" {it == null || Number} + } + } + } + } + } } \ No newline at end of file diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy index 7c4331d3b52e..572d9bae63f6 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SnsTracingTest.groovy @@ -6,16 +6,7 @@ import static io.opentelemetry.api.trace.SpanKind.CLIENT import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import com.amazonaws.services.sns.AmazonSNSAsyncClient -import com.amazonaws.services.sns.model.CreateTopicResult -import com.amazonaws.services.sqs.AmazonSQSAsyncClient -import com.amazonaws.services.sqs.model.GetQueueAttributesRequest -import com.amazonaws.services.sqs.model.ReceiveMessageRequest import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import org.slf4j.LoggerFactory -import org.testcontainers.containers.localstack.LocalStackContainer -import org.testcontainers.containers.output.Slf4jLogConsumer -import org.testcontainers.utility.DockerImageName import spock.lang.Ignore import spock.lang.Shared @@ -23,73 +14,26 @@ import spock.lang.Shared class SnsTracingTest extends AgentInstrumentationSpecification { @Shared - LocalStackContainer localstack - @Shared - AmazonSQSAsyncClient sqsClient - @Shared - AmazonSNSAsyncClient snsClient - - def setupSpec() { - - localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:latest")) - .withServices(LocalStackContainer.Service.SQS, LocalStackContainer.Service.SNS) - .withEnv("DEBUG", "1") - .withEnv("SQS_PROVIDER", "elasticmq") - localstack.start() - - sqsClient = AmazonSQSAsyncClient.asyncBuilder() - .withEndpointConfiguration(localstack.getEndpointConfiguration(LocalStackContainer.Service.SQS)) - .withCredentials(localstack.getDefaultCredentialsProvider()) - .build() + AwsConnector awsConnector = AwsConnector.liveAws() - snsClient = AmazonSNSAsyncClient.asyncBuilder() - .withEndpointConfiguration(localstack.getEndpointConfiguration(LocalStackContainer.Service.SNS)) - .withCredentials(localstack.getDefaultCredentialsProvider()) - .build() - - localstack.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("test"))) - } def cleanupSpec() { - if (localstack != null) { - localstack.stop() - } - } - - def createQueue(String queueName) { - return sqsClient.createQueue(queueName).getQueueUrl() + awsConnector.disconnect() } - def getQueueArn(String queueUrl) { - return sqsClient.getQueueAttributes( - new GetQueueAttributesRequest(queueUrl) - .withAttributeNames("QueueArn")).getAttributes() - .get("QueueArn") - } - - def setQueuePolicy(String queueUrl, String queueArn) { - sqsClient.setQueueAttributes(queueUrl, Collections.singletonMap("Policy", policy(queueArn))) - } - - def createAndSubscribeTopic(String topicName, String queueArn) { - CreateTopicResult ctr = snsClient.createTopic(topicName) - snsClient.subscribe(ctr.getTopicArn(), "sqs", queueArn) - return ctr.getTopicArn() - } - - def "simple SNS producer - SQS consumer services"() { + def "SNS notification triggers SQS message consumed with AWS SDK"() { setup: String queueName = "snsToSqsTestQueue" String topicName = "snsToSqsTestTopic" - String queueUrl = createQueue(queueName) - String queueArn = getQueueArn(queueUrl) - setQueuePolicy(queueUrl, queueArn) - String topicArn = createAndSubscribeTopic(topicName, queueArn) + String queueUrl = awsConnector.createQueue(queueName) + String queueArn = awsConnector.getQueueArn(queueUrl) + awsConnector.setQueuePublishingPolicy(queueUrl, queueArn) + String topicArn = awsConnector.createTopicAndSubscribeQueue(topicName, queueArn) when: - snsClient.publish(topicArn, "Hello There") - sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(20)) + awsConnector.publishSampleNotification(topicArn) + awsConnector.receiveMessage(queueUrl) then: assertTraces(7) { @@ -102,7 +46,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification { attributes { "aws.agent" "java-aws-sdk" "aws.endpoint" String - "aws.operation" "CreateQueueRequest" + "aws.operation" "CreateQueue" "aws.queue.name" queueName "aws.service" "AmazonSQS" "http.flavor" "1.1" @@ -124,7 +68,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification { attributes { "aws.agent" "java-aws-sdk" "aws.endpoint" String - "aws.operation" "GetQueueAttributesRequest" + "aws.operation" "GetQueueAttributes" "aws.queue.url" queueUrl "aws.service" "AmazonSQS" "http.flavor" "1.1" @@ -146,7 +90,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification { attributes { "aws.agent" "java-aws-sdk" "aws.endpoint" String - "aws.operation" "SetQueueAttributesRequest" + "aws.operation" "SetQueueAttributes" "aws.queue.url" queueUrl "aws.service" "AmazonSQS" "http.flavor" "1.1" @@ -168,7 +112,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification { attributes { "aws.agent" "java-aws-sdk" "aws.endpoint" String - "aws.operation" "CreateTopicRequest" + "aws.operation" "CreateTopic" "aws.service" "AmazonSNS" "http.flavor" "1.1" "http.method" "POST" @@ -189,7 +133,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification { attributes { "aws.agent" "java-aws-sdk" "aws.endpoint" String - "aws.operation" "SubscribeRequest" + "aws.operation" "Subscribe" "aws.service" "AmazonSNS" "http.flavor" "1.1" "http.method" "POST" @@ -209,7 +153,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification { attributes { "aws.agent" "java-aws-sdk" "aws.endpoint" String - "aws.operation" "PublishRequest" + "aws.operation" "Publish" "aws.service" "AmazonSNS" "http.flavor" "1.1" "http.method" "POST" @@ -227,7 +171,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification { attributes { "aws.agent" "java-aws-sdk" "aws.endpoint" String - "aws.operation" "ReceiveMessageRequest" + "aws.operation" "ReceiveMessage" "aws.queue.url" queueUrl "aws.service" "AmazonSQS" "http.flavor" "1.1" @@ -253,7 +197,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification { attributes { "aws.agent" "java-aws-sdk" "aws.endpoint" String - "aws.operation" "ReceiveMessageRequest" + "aws.operation" "ReceiveMessage" "aws.queue.url" queueUrl "aws.service" "AmazonSQS" "http.flavor" "1.1" @@ -268,18 +212,4 @@ class SnsTracingTest extends AgentInstrumentationSpecification { } } } - - def policy(String queueArn) { - return String.format(SQS_POLICY, queueArn) - } - - private static final String SQS_POLICY = "{" + - " \"Statement\": [" + - " {" + - " \"Effect\": \"Allow\"," + - " \"Principal\": \"*\"," + - " \"Action\": \"sqs:SendMessage\"," + - " \"Resource\": \"%s\"" + - " }]" + - "}" } \ No newline at end of file