diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cb4a036c7c..93bde7f4f9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Remove duplicate checkout from GitHub Actions workflow (#1407) - `NewExporter` from `exporters/otlp` now takes a `ProtocolDriver` as a parameter. (#1369) - Many OTLP Exporter options became gRPC ProtocolDriver options. (#1369) +- Unify endpoint API that related to OTel exporter. (#1401) ### Removed diff --git a/example/otel-collector/main.go b/example/otel-collector/main.go index 3ba6d029e12..11c763f0ce0 100644 --- a/example/otel-collector/main.go +++ b/example/otel-collector/main.go @@ -46,12 +46,12 @@ func initProvider() func() { // If the OpenTelemetry Collector is running on a local cluster (minikube or // microk8s), it should be accessible through the NodePort service at the - // `localhost:30080` address. Otherwise, replace `localhost` with the - // address of your cluster. If you run the app inside k8s, then you can + // `localhost:30080` endpoint. Otherwise, replace `localhost` with the + // endpoint of your cluster. If you run the app inside k8s, then you can // probably connect directly to the service through dns driver := otlp.NewGRPCDriver( otlp.WithInsecure(), - otlp.WithAddress("localhost:30080"), + otlp.WithEndpoint("localhost:30080"), otlp.WithGRPCDialOption(grpc.WithBlock()), // useful for testing ) exp, err := otlp.NewExporter(ctx, driver) diff --git a/exporters/otlp/example_test.go b/exporters/otlp/example_test.go index 56ec757fe55..c6cc4bfa115 100644 --- a/exporters/otlp/example_test.go +++ b/exporters/otlp/example_test.go @@ -135,11 +135,11 @@ func Example_withDifferentSignalCollectors() { // Set different endpoints for the metrics and traces collectors metricsDriver := otlp.NewGRPCDriver( otlp.WithInsecure(), - otlp.WithAddress("localhost:30080"), + otlp.WithEndpoint("localhost:30080"), ) tracesDriver := otlp.NewGRPCDriver( otlp.WithInsecure(), - otlp.WithAddress("localhost:30082"), + otlp.WithEndpoint("localhost:30082"), ) splitCfg := otlp.SplitConfig{ ForMetrics: metricsDriver, diff --git a/exporters/otlp/grpcconnection.go b/exporters/otlp/grpcconnection.go index c3c3af4dd4a..14cfcd9e6e9 100644 --- a/exporters/otlp/grpcconnection.go +++ b/exporters/otlp/grpcconnection.go @@ -202,7 +202,7 @@ func (oc *grpcConnection) setConnection(cc *grpc.ClientConn) bool { } func (oc *grpcConnection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) { - addr := oc.c.collectorAddr + endpoint := oc.c.collectorEndpoint dialOpts := []grpc.DialOption{} if oc.c.grpcServiceConfig != "" { @@ -223,7 +223,7 @@ func (oc *grpcConnection) dialToCollector(ctx context.Context) (*grpc.ClientConn ctx, cancel := oc.contextWithStop(ctx) defer cancel() ctx = oc.contextWithMetadata(ctx) - return grpc.DialContext(ctx, addr, dialOpts...) + return grpc.DialContext(ctx, endpoint, dialOpts...) } func (oc *grpcConnection) contextWithMetadata(ctx context.Context) context.Context { diff --git a/exporters/otlp/grpcdriver.go b/exporters/otlp/grpcdriver.go index 885cb1a1522..7d865360f10 100644 --- a/exporters/otlp/grpcdriver.go +++ b/exporters/otlp/grpcdriver.go @@ -40,7 +40,7 @@ type grpcDriver struct { func NewGRPCDriver(opts ...GRPCConnectionOption) ProtocolDriver { cfg := grpcConnectionConfig{ - collectorAddr: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort), + collectorEndpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort), grpcServiceConfig: DefaultGRPCServiceConfig, } for _, opt := range opts { diff --git a/exporters/otlp/grpcoptions.go b/exporters/otlp/grpcoptions.go index 6d4e5609756..bc8d9973f20 100644 --- a/exporters/otlp/grpcoptions.go +++ b/exporters/otlp/grpcoptions.go @@ -62,7 +62,7 @@ const ( type grpcConnectionConfig struct { canDialInsecure bool - collectorAddr string + collectorEndpoint string compressor string reconnectionPeriod time.Duration grpcServiceConfig string @@ -82,12 +82,12 @@ func WithInsecure() GRPCConnectionOption { } } -// WithAddress allows one to set the address that the exporter will +// WithEndpoint allows one to set the endpoint that the exporter will // connect to the collector on. If unset, it will instead try to use // connect to DefaultCollectorHost:DefaultCollectorPort. -func WithAddress(addr string) GRPCConnectionOption { +func WithEndpoint(endpoint string) GRPCConnectionOption { return func(cfg *grpcConnectionConfig) { - cfg.collectorAddr = addr + cfg.collectorEndpoint = endpoint } } diff --git a/exporters/otlp/mock_collector_test.go b/exporters/otlp/mock_collector_test.go index 0a4d4d670bd..0c46375a62d 100644 --- a/exporters/otlp/mock_collector_test.go +++ b/exporters/otlp/mock_collector_test.go @@ -26,16 +26,16 @@ import ( "google.golang.org/grpc" metadata "google.golang.org/grpc/metadata" - colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" - coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" + collectormetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" + collectortracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1" metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1" tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1" ) -func makeMockCollector(t *testing.T) *mockCol { - return &mockCol{ +func makeMockCollector(t *testing.T) *mockCollector { + return &mockCollector{ t: t, traceSvc: &mockTraceService{ rsm: map[string]*tracepb.ResourceSpans{}, @@ -74,7 +74,7 @@ func (mts *mockTraceService) getResourceSpans() []*tracepb.ResourceSpans { return rss } -func (mts *mockTraceService) Export(ctx context.Context, exp *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) { +func (mts *mockTraceService) Export(ctx context.Context, exp *collectortracepb.ExportTraceServiceRequest) (*collectortracepb.ExportTraceServiceResponse, error) { mts.mu.Lock() mts.headers, _ = metadata.FromIncomingContext(ctx) defer mts.mu.Unlock() @@ -100,7 +100,7 @@ func (mts *mockTraceService) Export(ctx context.Context, exp *coltracepb.ExportT } } } - return &coltracepb.ExportTraceServiceResponse{}, nil + return &collectortracepb.ExportTraceServiceResponse{}, nil } func resourceString(res *resourcepb.Resource) string { @@ -133,7 +133,7 @@ func (mms *mockMetricService) getMetrics() []*metricpb.Metric { return append(m, mms.metrics...) } -func (mms *mockMetricService) Export(ctx context.Context, exp *colmetricpb.ExportMetricsServiceRequest) (*colmetricpb.ExportMetricsServiceResponse, error) { +func (mms *mockMetricService) Export(ctx context.Context, exp *collectormetricpb.ExportMetricsServiceRequest) (*collectormetricpb.ExportMetricsServiceResponse, error) { mms.mu.Lock() for _, rm := range exp.GetResourceMetrics() { // TODO (rghetia) handle multiple resource and library info. @@ -142,26 +142,26 @@ func (mms *mockMetricService) Export(ctx context.Context, exp *colmetricpb.Expor } } mms.mu.Unlock() - return &colmetricpb.ExportMetricsServiceResponse{}, nil + return &collectormetricpb.ExportMetricsServiceResponse{}, nil } -type mockCol struct { +type mockCollector struct { t *testing.T traceSvc *mockTraceService metricSvc *mockMetricService - address string + endpoint string stopFunc func() error stopOnce sync.Once } -var _ coltracepb.TraceServiceServer = (*mockTraceService)(nil) -var _ colmetricpb.MetricsServiceServer = (*mockMetricService)(nil) +var _ collectortracepb.TraceServiceServer = (*mockTraceService)(nil) +var _ collectormetricpb.MetricsServiceServer = (*mockMetricService)(nil) var errAlreadyStopped = fmt.Errorf("already stopped") -func (mc *mockCol) stop() error { +func (mc *mockCollector) stop() error { var err = errAlreadyStopped mc.stopOnce.Do(func() { if mc.stopFunc != nil { @@ -191,37 +191,37 @@ func (mc *mockCol) stop() error { return err } -func (mc *mockCol) getSpans() []*tracepb.Span { +func (mc *mockCollector) getSpans() []*tracepb.Span { return mc.traceSvc.getSpans() } -func (mc *mockCol) getResourceSpans() []*tracepb.ResourceSpans { +func (mc *mockCollector) getResourceSpans() []*tracepb.ResourceSpans { return mc.traceSvc.getResourceSpans() } -func (mc *mockCol) getHeaders() metadata.MD { +func (mc *mockCollector) getHeaders() metadata.MD { return mc.traceSvc.getHeaders() } -func (mc *mockCol) getMetrics() []*metricpb.Metric { +func (mc *mockCollector) getMetrics() []*metricpb.Metric { return mc.metricSvc.getMetrics() } -// runMockCol is a helper function to create a mockCol -func runMockCol(t *testing.T) *mockCol { - return runMockColAtAddr(t, "localhost:0") +// runMockCollector is a helper function to create a mock Collector +func runMockCollector(t *testing.T) *mockCollector { + return runMockCollectorAtEndpoint(t, "localhost:0") } -func runMockColAtAddr(t *testing.T, addr string) *mockCol { - ln, err := net.Listen("tcp", addr) +func runMockCollectorAtEndpoint(t *testing.T, endpoint string) *mockCollector { + ln, err := net.Listen("tcp", endpoint) if err != nil { - t.Fatalf("Failed to get an address: %v", err) + t.Fatalf("Failed to get an endpoint: %v", err) } srv := grpc.NewServer() mc := makeMockCollector(t) - coltracepb.RegisterTraceServiceServer(srv, mc.traceSvc) - colmetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc) + collectortracepb.RegisterTraceServiceServer(srv, mc.traceSvc) + collectormetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc) go func() { _ = srv.Serve(ln) }() @@ -233,7 +233,7 @@ func runMockColAtAddr(t *testing.T, addr string) *mockCol { _, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String()) - mc.address = "localhost:" + collectorPortStr + mc.endpoint = "localhost:" + collectorPortStr mc.stopFunc = deferFunc return mc diff --git a/exporters/otlp/otlp_integration_test.go b/exporters/otlp/otlp_integration_test.go index 1030ab9f008..4865518a877 100644 --- a/exporters/otlp/otlp_integration_test.go +++ b/exporters/otlp/otlp_integration_test.go @@ -81,10 +81,10 @@ func TestNewExporter_endToEnd(t *testing.T) { } } -func newGRPCExporter(t *testing.T, ctx context.Context, address string, additionalOpts ...otlp.GRPCConnectionOption) *otlp.Exporter { +func newGRPCExporter(t *testing.T, ctx context.Context, endpoint string, additionalOpts ...otlp.GRPCConnectionOption) *otlp.Exporter { opts := []otlp.GRPCConnectionOption{ otlp.WithInsecure(), - otlp.WithAddress(address), + otlp.WithEndpoint(endpoint), otlp.WithReconnectionPeriod(50 * time.Millisecond), } @@ -97,7 +97,7 @@ func newGRPCExporter(t *testing.T, ctx context.Context, address string, addition return exp } -func runEndToEndTest(t *testing.T, ctx context.Context, exp *otlp.Exporter, mcTraces, mcMetrics *mockCol) { +func runEndToEndTest(t *testing.T, ctx context.Context, exp *otlp.Exporter, mcTraces, mcMetrics *mockCollector) { pOpts := []sdktrace.TracerProviderOption{ sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), sdktrace.WithBatcher( @@ -326,7 +326,7 @@ func runEndToEndTest(t *testing.T, ctx context.Context, exp *otlp.Exporter, mcTr } func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionOption) { - mc := runMockColAtAddr(t, "localhost:56561") + mc := runMockCollectorAtEndpoint(t, "localhost:56561") defer func() { _ = mc.stop() @@ -335,7 +335,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO <-time.After(5 * time.Millisecond) ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.address, additionalOpts...) + exp := newGRPCExporter(t, ctx, mc.endpoint, additionalOpts...) defer func() { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() @@ -348,13 +348,13 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO } func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) { - mc := runMockCol(t) + mc := runMockCollector(t) defer func() { _ = mc.stop() }() ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.address) + exp := newGRPCExporter(t, ctx, mc.endpoint) defer func() { if err := exp.Shutdown(ctx); err != nil { panic(err) @@ -380,11 +380,11 @@ func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) { } func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { - mc := runMockCol(t) + mc := runMockCollector(t) reconnectionPeriod := 20 * time.Millisecond ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.address, + exp := newGRPCExporter(t, ctx, mc.endpoint, otlp.WithReconnectionPeriod(reconnectionPeriod)) defer func() { _ = exp.Shutdown(ctx) @@ -404,12 +404,12 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}}), "transport: Error while dialing dial tcp %s: connect: connection refused", - mc.address, + mc.endpoint, ) // Now resurrect the collector by making a new one but reusing the - // old address, and the collector should reconnect automatically. - nmc := runMockColAtAddr(t, mc.address) + // old endpoint, and the collector should reconnect automatically. + nmc := runMockCollectorAtEndpoint(t, mc.endpoint) // Give the exporter sometime to reconnect <-time.After(reconnectionPeriod * 4) @@ -444,37 +444,37 @@ func TestNewExporter_collectorOnBadConnection(t *testing.T) { if err != nil { t.Fatalf("Failed to grab an available port: %v", err) } - // Firstly close the "collector's" channel: optimistically this address won't get reused ASAP + // Firstly close the "collector's" channel: optimistically this endpoint won't get reused ASAP // However, our goal of closing it is to simulate an unavailable connection _ = ln.Close() _, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String()) - address := fmt.Sprintf("localhost:%s", collectorPortStr) + endpoint := fmt.Sprintf("localhost:%s", collectorPortStr) ctx := context.Background() - exp := newGRPCExporter(t, ctx, address) + exp := newGRPCExporter(t, ctx, endpoint) _ = exp.Shutdown(ctx) } -func TestNewExporter_withAddress(t *testing.T) { - mc := runMockCol(t) +func TestNewExporter_withEndpoint(t *testing.T) { + mc := runMockCollector(t) defer func() { _ = mc.stop() }() ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.address) + exp := newGRPCExporter(t, ctx, mc.endpoint) _ = exp.Shutdown(ctx) } func TestNewExporter_withHeaders(t *testing.T) { - mc := runMockCol(t) + mc := runMockCollector(t) defer func() { _ = mc.stop() }() ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.address, + exp := newGRPCExporter(t, ctx, mc.endpoint, otlp.WithHeaders(map[string]string{"header1": "value1"})) require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}})) @@ -488,7 +488,7 @@ func TestNewExporter_withHeaders(t *testing.T) { } func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { - mc := runMockCol(t) + mc := runMockCollector(t) defer func() { _ = mc.stop() @@ -497,7 +497,7 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { <-time.After(5 * time.Millisecond) ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.address) + exp := newGRPCExporter(t, ctx, mc.endpoint) defer func() { _ = exp.Shutdown(ctx) @@ -690,9 +690,9 @@ func discSpanSnapshot() *exporttrace.SpanSnapshot { func TestDisconnected(t *testing.T) { ctx := context.Background() - // The address is whatever, we want to be disconnected. But we + // The endpoint is whatever, we want to be disconnected. But we // setting a blocking connection, so dialing to the invalid - // address actually fails. + // endpoint actually fails. exp := newGRPCExporter(t, ctx, "invalid", otlp.WithReconnectionPeriod(time.Hour), otlp.WithGRPCDialOption( @@ -720,7 +720,7 @@ func (emptyCheckpointSet) RLock() {} func (emptyCheckpointSet) RUnlock() {} func TestEmptyData(t *testing.T) { - mc := runMockColAtAddr(t, "localhost:56561") + mc := runMockCollectorAtEndpoint(t, "localhost:56561") defer func() { _ = mc.stop() @@ -729,7 +729,7 @@ func TestEmptyData(t *testing.T) { <-time.After(5 * time.Millisecond) ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.address) + exp := newGRPCExporter(t, ctx, mc.endpoint) defer func() { assert.NoError(t, exp.Shutdown(ctx)) }() @@ -750,7 +750,7 @@ func (failCheckpointSet) RLock() {} func (failCheckpointSet) RUnlock() {} func TestFailedMetricTransform(t *testing.T) { - mc := runMockColAtAddr(t, "localhost:56561") + mc := runMockCollectorAtEndpoint(t, "localhost:56561") defer func() { _ = mc.stop() @@ -759,7 +759,7 @@ func TestFailedMetricTransform(t *testing.T) { <-time.After(5 * time.Millisecond) ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.address) + exp := newGRPCExporter(t, ctx, mc.endpoint) defer func() { assert.NoError(t, exp.Shutdown(ctx)) }() @@ -768,8 +768,8 @@ func TestFailedMetricTransform(t *testing.T) { } func TestMultiConnectionDriver(t *testing.T) { - mcTraces := runMockCol(t) - mcMetrics := runMockCol(t) + mcTraces := runMockCollector(t) + mcMetrics := runMockCollector(t) defer func() { _ = mcTraces.stop() @@ -784,10 +784,10 @@ func TestMultiConnectionDriver(t *testing.T) { otlp.WithGRPCDialOption(grpc.WithBlock()), } optsTraces := append([]otlp.GRPCConnectionOption{ - otlp.WithAddress(mcTraces.address), + otlp.WithEndpoint(mcTraces.endpoint), }, commonOpts...) optsMetrics := append([]otlp.GRPCConnectionOption{ - otlp.WithAddress(mcMetrics.address), + otlp.WithEndpoint(mcMetrics.endpoint), }, commonOpts...) tracesDriver := otlp.NewGRPCDriver(optsTraces...)