From 5fa7c41f20e6419889a5f15fb38f785ed7ade23e Mon Sep 17 00:00:00 2001
From: Bogdan Drutu <bogdandrutu@gmail.com>
Date: Thu, 14 Dec 2023 10:27:58 -0800
Subject: [PATCH] [chore] fix resourcetotelemetry usage in carbonexporter

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
---
 exporter/carbonexporter/exporter.go      |  9 +++-
 exporter/carbonexporter/exporter_test.go | 64 ++++++++++++++++--------
 2 files changed, 51 insertions(+), 22 deletions(-)

diff --git a/exporter/carbonexporter/exporter.go b/exporter/carbonexporter/exporter.go
index 2b18fdd71461..586faf279f90 100644
--- a/exporter/carbonexporter/exporter.go
+++ b/exporter/carbonexporter/exporter.go
@@ -12,6 +12,8 @@ import (
 	"go.opentelemetry.io/collector/exporter"
 	"go.opentelemetry.io/collector/exporter/exporterhelper"
 	"go.opentelemetry.io/collector/pdata/pmetric"
+
+	"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
 )
 
 // newCarbonExporter returns a new Carbon exporter.
@@ -20,7 +22,7 @@ func newCarbonExporter(cfg *Config, set exporter.CreateSettings) (exporter.Metri
 		connPool: newTCPConnPool(cfg.Endpoint, cfg.Timeout),
 	}
 
-	return exporterhelper.NewMetricsExporter(
+	exp, err := exporterhelper.NewMetricsExporter(
 		context.TODO(),
 		set,
 		cfg,
@@ -29,6 +31,11 @@ func newCarbonExporter(cfg *Config, set exporter.CreateSettings) (exporter.Metri
 		exporterhelper.WithQueue(cfg.QueueConfig),
 		exporterhelper.WithRetry(cfg.RetryConfig),
 		exporterhelper.WithShutdown(sender.Shutdown))
+	if err != nil {
+		return nil, err
+	}
+
+	return resourcetotelemetry.WrapMetricsExporter(cfg.ResourceToTelemetryConfig, exp), nil
 }
 
 // carbonSender is the struct tying the translation function and the TCP
diff --git a/exporter/carbonexporter/exporter_test.go b/exporter/carbonexporter/exporter_test.go
index 8101e4717527..37c321c84be9 100644
--- a/exporter/carbonexporter/exporter_test.go
+++ b/exporter/carbonexporter/exporter_test.go
@@ -27,6 +27,7 @@ import (
 	conventions "go.opentelemetry.io/collector/semconv/v1.9.0"
 
 	"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
+	"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
 )
 
 func TestNewWithDefaultConfig(t *testing.T) {
@@ -45,10 +46,31 @@ func TestConsumeMetricsNoServer(t *testing.T) {
 		exportertest.NewNopCreateSettings())
 	require.NoError(t, err)
 	require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost()))
-	require.Error(t, exp.ConsumeMetrics(context.Background(), generateLargeBatch()))
+	require.Error(t, exp.ConsumeMetrics(context.Background(), generateSmallBatch()))
 	require.NoError(t, exp.Shutdown(context.Background()))
 }
 
+func TestConsumeMetricsWithResourceToTelemetry(t *testing.T) {
+	addr := testutil.GetAvailableLocalAddress(t)
+	cs := newCarbonServer(t, addr, "test_0;k0=v0;k1=v1;service.name=test_carbon 0")
+	// Each metric point will generate one Carbon line, set up the wait
+	// for all of them.
+	cs.start(t, 1)
+
+	exp, err := newCarbonExporter(
+		&Config{
+			TCPAddr:                   confignet.TCPAddr{Endpoint: addr},
+			TimeoutSettings:           exporterhelper.TimeoutSettings{Timeout: 5 * time.Second},
+			ResourceToTelemetryConfig: resourcetotelemetry.Settings{Enabled: true},
+		},
+		exportertest.NewNopCreateSettings())
+	require.NoError(t, err)
+	require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost()))
+	require.NoError(t, exp.ConsumeMetrics(context.Background(), generateSmallBatch()))
+	assert.NoError(t, exp.Shutdown(context.Background()))
+	cs.shutdownAndVerify(t)
+}
+
 func TestConsumeMetrics(t *testing.T) {
 	if runtime.GOOS == "windows" {
 		t.Skip("skipping test on windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10147")
@@ -94,7 +116,7 @@ func TestConsumeMetrics(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			addr := testutil.GetAvailableLocalAddress(t)
-			cs := newCarbonServer(t, addr)
+			cs := newCarbonServer(t, addr, "")
 			// Each metric point will generate one Carbon line, set up the wait
 			// for all of them.
 			cs.start(t, tt.numProducers*tt.writesPerProducer*tt.md.DataPointCount())
@@ -133,25 +155,21 @@ func TestConsumeMetrics(t *testing.T) {
 }
 
 func generateSmallBatch() pmetric.Metrics {
-	metrics := pmetric.NewMetrics()
-	m := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
-	m.SetName("test_gauge")
-	dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
-	dp.Attributes().PutStr("k0", "v0")
-	dp.Attributes().PutStr("k1", "v1")
-	dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
-	dp.SetDoubleValue(123)
-	return metrics
+	return generateMetricsBatch(1)
 }
 
 func generateLargeBatch() pmetric.Metrics {
+	return generateMetricsBatch(1024)
+}
+
+func generateMetricsBatch(size int) pmetric.Metrics {
 	ts := time.Now()
 	metrics := pmetric.NewMetrics()
 	rm := metrics.ResourceMetrics().AppendEmpty()
 	rm.Resource().Attributes().PutStr(conventions.AttributeServiceName, "test_carbon")
 	ms := rm.ScopeMetrics().AppendEmpty().Metrics()
 
-	for i := 0; i < 1028; i++ {
+	for i := 0; i < size; i++ {
 		m := ms.AppendEmpty()
 		m.SetName("test_" + strconv.Itoa(i))
 		dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
@@ -165,19 +183,21 @@ func generateLargeBatch() pmetric.Metrics {
 }
 
 type carbonServer struct {
-	ln         *net.TCPListener
-	doneServer *atomic.Bool
-	wg         sync.WaitGroup
+	ln                    *net.TCPListener
+	doneServer            *atomic.Bool
+	wg                    sync.WaitGroup
+	expectedContainsValue string
 }
 
-func newCarbonServer(t *testing.T, addr string) *carbonServer {
+func newCarbonServer(t *testing.T, addr string, expectedContainsValue string) *carbonServer {
 	laddr, err := net.ResolveTCPAddr("tcp", addr)
 	require.NoError(t, err)
 	ln, err := net.ListenTCP("tcp", laddr)
 	require.NoError(t, err)
 	return &carbonServer{
-		ln:         ln,
-		doneServer: &atomic.Bool{},
+		ln:                    ln,
+		doneServer:            &atomic.Bool{},
+		expectedContainsValue: expectedContainsValue,
 	}
 }
 
@@ -198,14 +218,16 @@ func (cs *carbonServer) start(t *testing.T, numExpectedReq int) {
 
 				reader := bufio.NewReader(conn)
 				for {
-					// Actual metric validation is done by other tests, here it
-					// is just flow.
-					_, err := reader.ReadBytes(byte('\n'))
+					buf, err := reader.ReadBytes(byte('\n'))
 					if errors.Is(err, io.EOF) {
 						return
 					}
 					require.NoError(t, err)
 
+					if cs.expectedContainsValue != "" {
+						assert.Contains(t, string(buf), cs.expectedContainsValue)
+					}
+
 					cs.wg.Done()
 				}
 			}(conn)