Skip to content

Commit

Permalink
Allow overriding OTLP gRPC authority. (#4514)
Browse files Browse the repository at this point in the history
  • Loading branch information
anuraaga authored Jun 10, 2022
1 parent 29f274e commit 9dc1be5
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
Comparing source compatibility of against
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++! NEW METHOD: PUBLIC(+) boolean getBoolean(java.lang.String, boolean)
+++! NEW METHOD: PUBLIC(+) double getDouble(java.lang.String, double)
+++! NEW METHOD: PUBLIC(+) java.time.Duration getDuration(java.lang.String, java.time.Duration)
+++! NEW METHOD: PUBLIC(+) int getInt(java.lang.String, int)
+++! NEW METHOD: PUBLIC(+) java.util.List getList(java.lang.String, java.util.List)
+++! NEW METHOD: PUBLIC(+) long getLong(java.lang.String, long)
+++! NEW METHOD: PUBLIC(+) java.util.Map getMap(java.lang.String, java.util.Map)
+++! NEW METHOD: PUBLIC(+) java.lang.String getString(java.lang.String, java.lang.String)
+++ NEW INTERFACE: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.autoconfigure.spi.metrics.ConfigurableMetricExporterProvider (not serializable)
+++ CLASS FILE FORMAT VERSION: 52.0 <- n.a.
+++ NEW SUPERCLASS: java.lang.Object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.opentelemetry.exporter.internal.grpc.MarshalerInputStream;
import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub;
import java.io.InputStream;
import javax.annotation.Nullable;

// Adapted from the protoc generated code for CollectorServiceGrpc.
final class MarshalerCollectorServiceGrpc {
Expand Down Expand Up @@ -56,8 +57,11 @@ public PostSpansResponse parse(InputStream stream) {
.setResponseMarshaller(RESPONSE_MARSHALER)
.build();

static CollectorServiceFutureStub newFutureStub(Channel channel) {
return CollectorServiceFutureStub.newStub(CollectorServiceFutureStub::new, channel);
static CollectorServiceFutureStub newFutureStub(
Channel channel, @Nullable String authorityOverride) {
return CollectorServiceFutureStub.newStub(
(c, options) -> new CollectorServiceFutureStub(c, options.withAuthority(authorityOverride)),
channel);
}

static final class CollectorServiceFutureStub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub;
import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler;
import java.io.InputStream;
import javax.annotation.Nullable;

// Adapted from the protoc generated code for MetricsServiceGrpc.
final class MarshalerMetricsServiceGrpc {
Expand Down Expand Up @@ -59,8 +60,11 @@ public ExportMetricsServiceResponse parse(InputStream stream) {
.setResponseMarshaller(RESPONSE_MARSHALER)
.build();

static MetricsServiceFutureStub newFutureStub(Channel channel) {
return MetricsServiceFutureStub.newStub(MetricsServiceFutureStub::new, channel);
static MetricsServiceFutureStub newFutureStub(
Channel channel, @Nullable String authorityOverride) {
return MetricsServiceFutureStub.newStub(
(c, options) -> new MetricsServiceFutureStub(c, options.withAuthority(authorityOverride)),
channel);
}

static final class MetricsServiceFutureStub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import java.io.InputStream;
import javax.annotation.Nullable;

// Adapted from the protoc generated code for TraceServiceGrpc.
final class MarshalerTraceServiceGrpc {
Expand Down Expand Up @@ -53,8 +54,11 @@ public ExportTraceServiceResponse parse(InputStream stream) {
.setResponseMarshaller(RESPONSE_MARSHALER)
.build();

static TraceServiceFutureStub newFutureStub(io.grpc.Channel channel) {
return TraceServiceFutureStub.newStub(TraceServiceFutureStub::new, channel);
static TraceServiceFutureStub newFutureStub(
io.grpc.Channel channel, @Nullable String authorityOverride) {
return TraceServiceFutureStub.newStub(
(c, options) -> new TraceServiceFutureStub(c, options.withAuthority(authorityOverride)),
channel);
}

static final class TraceServiceFutureStub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;

Expand All @@ -35,7 +35,7 @@ public final class DefaultGrpcExporterBuilder<T extends Marshaler>

private final String exporterName;
private final String type;
private final Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>> stubFactory;
private final BiFunction<ManagedChannel, String, MarshalerServiceStub<T, ?, ?>> stubFactory;
private final String grpcServiceName;

@Nullable private ManagedChannel channel;
Expand All @@ -47,14 +47,15 @@ public final class DefaultGrpcExporterBuilder<T extends Marshaler>
@Nullable private byte[] privateKeyPem;
@Nullable private byte[] certificatePem;
@Nullable RetryPolicy retryPolicy;
@Nullable String authorityOverride;
private MeterProvider meterProvider = MeterProvider.noop();

/** Creates a new {@link DefaultGrpcExporterBuilder}. */
// Visible for testing
public DefaultGrpcExporterBuilder(
String exporterName,
String type,
Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>> stubFactory,
BiFunction<ManagedChannel, String, MarshalerServiceStub<T, ?, ?>> stubFactory,
long defaultTimeoutSecs,
URI defaultEndpoint,
String grpcServiceName) {
Expand Down Expand Up @@ -110,6 +111,10 @@ public GrpcExporterBuilder<T> setClientTls(byte[] privateKeyPem, byte[] certific

@Override
public DefaultGrpcExporterBuilder<T> addHeader(String key, String value) {
if (key.equals("host")) {
authorityOverride = value;
return this;
}
if (metadata == null) {
metadata = new Metadata();
}
Expand Down Expand Up @@ -167,7 +172,7 @@ public GrpcExporter<T> build() {

Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE;
MarshalerServiceStub<T, ?, ?> stub =
stubFactory.apply(channel).withCompression(codec.getMessageEncoding());
stubFactory.apply(channel, authorityOverride).withCompression(codec.getMessageEncoding());
return new DefaultGrpcExporter<>(
exporterName, type, channel, stub, meterProvider, timeoutNanos);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.net.URI;
import java.util.function.Function;
import java.util.function.BiFunction;
import java.util.function.Supplier;

/**
Expand All @@ -26,7 +26,7 @@ static <T extends Marshaler> GrpcExporterBuilder<T> builder(
String type,
long defaultTimeoutSecs,
URI defaultEndpoint,
Supplier<Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>>> stubFactory,
Supplier<BiFunction<ManagedChannel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
String grpcServiceName,
String grpcEndpointPath) {
return GrpcExporterUtil.exporterBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import java.net.URI;
import java.util.function.Function;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -35,7 +35,7 @@ static <T extends Marshaler> GrpcExporterBuilder<T> exporterBuilder(
String type,
long defaultTimeoutSecs,
URI defaultEndpoint,
Supplier<Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>>> stubFactory,
Supplier<BiFunction<ManagedChannel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
String grpcServiceName,
String grpcEndpointPath) {
if (USE_OKHTTP) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ void setRetryPolicyOnDelegate_DefaultGrpcExporterBuilder() throws URISyntaxExcep
RetryPolicy retryPolicy = RetryPolicy.getDefault();
DefaultGrpcExporterBuilder<?> builder =
new DefaultGrpcExporterBuilder<>(
"otlp", "test", unused -> null, 0, new URI("http://localhost"), "test");
"otlp", "test", (u1, u2) -> null, 0, new URI("http://localhost"), "test");

RetryUtil.setRetryPolicyOnDelegate(new WithDelegate(builder), retryPolicy);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub;
import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
import java.io.InputStream;
import javax.annotation.Nullable;

// Adapted from the protoc generated code for LogsServiceGrpc.
final class MarshalerLogsServiceGrpc {
Expand Down Expand Up @@ -57,8 +58,10 @@ public ExportLogsServiceResponse parse(InputStream stream) {
.setResponseMarshaller(RESPONSE_MARSHALER)
.build();

static LogsServiceFutureStub newFutureStub(Channel channel) {
return LogsServiceFutureStub.newStub(LogsServiceFutureStub::new, channel);
static LogsServiceFutureStub newFutureStub(Channel channel, @Nullable String authorityOverride) {
return LogsServiceFutureStub.newStub(
(c, options) -> new LogsServiceFutureStub(c, options.withAuthority(authorityOverride)),
channel);
}

static final class LogsServiceFutureStub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.grpc.protocol.ArmeriaStatusException;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.ServiceRequestContext;
Expand Down Expand Up @@ -73,6 +74,9 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {

private static final AtomicInteger attempts = new AtomicInteger();

private static final ConcurrentLinkedQueue<HttpRequest> httpRequests =
new ConcurrentLinkedQueue<>();

@RegisterExtension
@Order(1)
static final SelfSignedCertificateExtension certificate = new SelfSignedCertificateExtension();
Expand Down Expand Up @@ -131,6 +135,7 @@ private CollectorService(

@Override
protected CompletionStage<byte[]> handleMessage(ServiceRequestContext ctx, byte[] message) {
httpRequests.add(ctx.request());
attempts.incrementAndGet();
T request;
try {
Expand Down Expand Up @@ -177,6 +182,7 @@ void reset() {
exportedResourceTelemetry.clear();
grpcErrors.clear();
attempts.set(0);
httpRequests.clear();
}

@Test
Expand Down Expand Up @@ -500,6 +506,27 @@ void nonRetryableError(int code) {
assertThat(attempts).hasValue(1);
}

@Test
void overrideHost() {
List<T> telemetry = Collections.singletonList(generateFakeTelemetry());
TelemetryExporter<T> exporter =
exporterBuilder()
.setEndpoint(server.httpUri().toString())
.addHeader("host", "opentelemetry")
.build();
try {
assertThat(exporter.export(telemetry).join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
} finally {
exporter.shutdown();
}
List<U> expectedResourceTelemetry = toProto(telemetry);
assertThat(exportedResourceTelemetry).containsExactlyElementsOf(expectedResourceTelemetry);

assertThat(httpRequests)
.singleElement()
.satisfies(req -> assertThat(req.authority()).isEqualTo("opentelemetry"));
}

@Test
@SuppressWarnings("PreferJavaTimeOverload")
void validConfig() {
Expand Down

1 comment on commit 9dc1be5

@Gwar84
Copy link

@Gwar84 Gwar84 commented on 9dc1be5 Jul 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh it is I the one they call x

Please sign in to comment.