Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message handler #8331

Draft
wants to merge 48 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
4889f26
BatchMessageHandler
atshaw43 Apr 20, 2023
0131ff2
Fixing parent bug
atshaw43 Apr 21, 2023
5934f86
Fixing readme
atshaw43 Apr 21, 2023
0ce11c5
Applying spot check
atshaw43 Apr 21, 2023
2f912cf
Addressing style checks and addign test for invalid upstream span con…
atshaw43 Apr 21, 2023
9bb39f4
Adding OTEL format
atshaw43 Apr 21, 2023
7aca0d4
Addressing PR
atshaw43 Apr 24, 2023
d6ec4d0
Applying spot check
atshaw43 Apr 24, 2023
1741b7b
Fixing style check
atshaw43 Apr 24, 2023
a34eba7
Adding clarification
atshaw43 Apr 24, 2023
b23a221
Refactorting some logic
atshaw43 Apr 25, 2023
1e2402e
Fixing stylecheck
atshaw43 Apr 25, 2023
88f0484
Addressing PR
atshaw43 May 31, 2023
d5af1fd
Running spotless
atshaw43 May 31, 2023
0e08dad
Fixing readme
atshaw43 May 31, 2023
072f189
Adding exception test
atshaw43 May 31, 2023
f036493
Running spotless
atshaw43 Jun 1, 2023
6874ab7
Update instrumentation/message-handler/library/src/main/java/io/opent…
atshaw43 Jun 1, 2023
9bc1864
Merging
atshaw43 Jun 1, 2023
af3ce0f
Running spotlessapply
atshaw43 Jun 1, 2023
3b625c4
Changing to AWS SDK SQS message object
atshaw43 Jun 2, 2023
f9433b1
Fixing annotations file
atshaw43 Jun 2, 2023
149d9b6
Fixing annotations
atshaw43 Jun 2, 2023
27eefc3
Merge branch 'open-telemetry:main' into MessageHandler
atshaw43 Jun 5, 2023
1b8273e
Adding customer gitter for message atribute values
atshaw43 Jun 5, 2023
9a46997
Applying spotless
atshaw43 Jun 5, 2023
1b14ab3
Addresing PR
atshaw43 Jun 7, 2023
40c7a67
Applying spotless
atshaw43 Jun 7, 2023
6aef8cb
Fixing test runner
atshaw43 Jun 7, 2023
dfceb72
Refactoring test code
atshaw43 Jun 7, 2023
a591789
Refactoring to use Messaging Extractors
atshaw43 Jun 9, 2023
371d793
Refactoring moduel heirarchy
atshaw43 Jun 13, 2023
5a51314
Refactoring modules
atshaw43 Jun 13, 2023
46feb15
Remove Warning
atshaw43 Jun 13, 2023
8505b8e
Fixing spot check
atshaw43 Jun 13, 2023
e81aa56
Refactor to receiver
atshaw43 Jun 15, 2023
24af0c9
Cleanup
atshaw43 Jun 15, 2023
80eacf1
Changing to process messages only
atshaw43 Jul 21, 2023
19c419b
Fixing typo
atshaw43 Jul 21, 2023
c0958a8
Adding more information regarding what console will look like
atshaw43 Jul 21, 2023
5f41613
Adding support for SQSEvent.SQSMessage
atshaw43 Jul 25, 2023
b810b67
Fixing AWS SDK
atshaw43 Aug 14, 2023
1e0104e
Splitting out lambda
atshaw43 Aug 16, 2023
ed4d6d2
Fixing tests
atshaw43 Aug 22, 2023
018cd53
Polishing
atshaw43 Aug 28, 2023
bbbc0aa
Addressing PR
atshaw43 Aug 28, 2023
c14d4c1
Fixing Read Me
atshaw43 Aug 28, 2023
c6a0fd8
Adding back removed import
atshaw43 Aug 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
testImplementation("io.opentelemetry:opentelemetry-extension-trace-propagators")
testImplementation("com.google.guava:guava")

implementation(project(":instrumentation:message-handler:message-handler-1.0:library"))
testImplementation(project(":instrumentation:aws-lambda:aws-lambda-events-2.2:testing"))
testImplementation("uk.org.webcompere:system-stubs-jupiter")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.awslambdaevents.v2_2;

import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.messagehandler.MessageHandler;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;

public abstract class SqsMessageHandler extends MessageHandler<SQSEvent.SQSMessage> {
private static final String AWS_TRACE_HEADER_SQS_ATTRIBUTE_KEY = "AWSTraceHeader";
static final String AWS_TRACE_HEADER_PROPAGATOR_KEY = "x-amzn-trace-id";

private final OpenTelemetry openTelemetry;
private final String destination;
private SpanKindExtractor<Collection<SQSEvent.SQSMessage>> spanKindExtractor;
private SpanNameExtractor<Collection<SQSEvent.SQSMessage>> spanNameExtractor;

public SqsMessageHandler(OpenTelemetry openTelemetry, String destination) {
this.openTelemetry = openTelemetry;
this.destination = destination;
this.spanKindExtractor = SpanKindExtractor.alwaysConsumer();
spanNameExtractor = e -> destination + " process";
}

public void setSpanNameExtactor(SpanNameExtractor<Collection<SQSEvent.SQSMessage>> spanNameExtractor) {
this.spanNameExtractor = spanNameExtractor;
}

@Override
protected Instrumenter<Collection<SQSEvent.SQSMessage>, Void> getMessageInstrumenter() {
return Instrumenter.<Collection<SQSEvent.SQSMessage>, Void>builder(
openTelemetry, "io.opentelemetry.aws-lambda-events-2.2", spanNameExtractor)
.addAttributesExtractor(getMessageOperationAttributeExtractor())
.addSpanLinksExtractor(getSpanLinksExtractor())
.buildInstrumenter(spanKindExtractor);
}

public void setSpanKindExtractor(SpanKindExtractor<Collection<SQSEvent.SQSMessage>> spanKindExtractor) {
this.spanKindExtractor = spanKindExtractor;
}

protected MessagingAttributesGetter<Collection<SQSEvent.SQSMessage>, Void>
getMessageingAttributesGetter() {
String destination = this.destination;

return new MessagingAttributesGetter<Collection<SQSEvent.SQSMessage>, Void>() {
@Nullable
@Override
public String getSystem(Collection<SQSEvent.SQSMessage> v) {
return "AmazonSQS";
}

@Nullable
@Override
@SuppressWarnings({"deprecation"}) // Inheriting from interface
public String getDestinationKind(Collection<SQSEvent.SQSMessage> v) {
return null;
}

@Nullable
@Override
public String getDestination(Collection<SQSEvent.SQSMessage> v) {
return destination;
}

@Override
public boolean isTemporaryDestination(Collection<SQSEvent.SQSMessage> v) {
return false;
}

@Nullable
@Override
public String getConversationId(Collection<SQSEvent.SQSMessage> v) {
return null;
}

@Nullable
@Override
public Long getMessagePayloadSize(Collection<SQSEvent.SQSMessage> v) {
long total = 0;

for (SQSEvent.SQSMessage message : v) {
total += getPayloadSize(message);
}

return total;
}

@Nullable
@Override
public Long getMessagePayloadCompressedSize(Collection<SQSEvent.SQSMessage> v) {
return null;
}

@Nullable
@Override
public String getMessageId(Collection<SQSEvent.SQSMessage> request, Void v) {
return null;
}
};
}

protected AttributesExtractor<Collection<SQSEvent.SQSMessage>, Void> getMessageOperationAttributeExtractor() {
return MessagingAttributesExtractor.create(
getMessageingAttributesGetter(), MessageOperation.PROCESS);
}

protected SpanLinksExtractor<Collection<SQSEvent.SQSMessage>> getSpanLinksExtractor() {
return (spanLinks, parentContext, request) -> {
for (SQSEvent.SQSMessage message : request) {
SpanContext messageSpanCtx = getUpstreamContext(openTelemetry, message);

if (messageSpanCtx!= null && messageSpanCtx.isValid()) {
spanLinks.addLink(messageSpanCtx);
}
}
};
}

public int getPayloadSize(SQSEvent.SQSMessage message) {
return message.getBody().length();
}

public SpanContext getUpstreamContext(OpenTelemetry openTelemetry, SQSEvent.SQSMessage message) {
String parentHeader = null;

if (message.getAttributes() != null) {
parentHeader = message.getAttributes().get(AWS_TRACE_HEADER_SQS_ATTRIBUTE_KEY);
}

if (parentHeader == null &&
message.getMessageAttributes() != null)
{
// We need to do a case-insensitive search
for (Map.Entry<String, SQSEvent.MessageAttribute> entry: message.getMessageAttributes().entrySet()) {
if (entry.getKey().equalsIgnoreCase(AWS_TRACE_HEADER_PROPAGATOR_KEY)) {
parentHeader = entry.getValue().getStringValue();
break;
}
}
}

if (parentHeader != null) {
Context xrayContext =
AwsXrayPropagator.getInstance()
.extract(
Context.root(),
Collections.singletonMap(AWS_TRACE_HEADER_PROPAGATOR_KEY, parentHeader),
MapGetter.INSTANCE);

return Span.fromContext(xrayContext).getSpanContext();
}

return null;
}

private enum MapGetter implements TextMapGetter<Map<String, String>> {
INSTANCE;

@Override
public Iterable<String> keys(Map<String, String> map) {
return map.keySet();
}

@Override
public String get(Map<String, String> map, String s) {
return map.get(s.toLowerCase(Locale.ROOT));
}
}
}
Loading