Skip to content

Commit

Permalink
[chore] improve carbonexporter tests (#29878)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Dec 14, 2023
1 parent c49dbc1 commit c3300dd
Showing 1 changed file with 129 additions and 162 deletions.
291 changes: 129 additions & 162 deletions exporter/carbonexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net"
"runtime"
Expand All @@ -30,225 +29,193 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
)

func TestNew(t *testing.T) {
func TestNewWithDefaultConfig(t *testing.T) {
cfg := createDefaultConfig().(*Config)
got, err := newCarbonExporter(cfg, exportertest.NewNopCreateSettings())
assert.NotNil(t, got)
assert.NoError(t, err)
}

func TestConsumeMetricsData(t *testing.T) {
t.Skip("skipping flaky test, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/396")
smallBatch := pmetric.NewMetrics()
m := smallBatch.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
m.SetName("test_gauge")
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
dp.Attributes().PutStr("k0", "v0")
dp.Attributes().PutStr("k1", "v1")
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dp.SetDoubleValue(123)
largeBatch := generateLargeBatch()
func TestConsumeMetricsNoServer(t *testing.T) {
exp, err := newCarbonExporter(
&Config{
TCPAddr: confignet.TCPAddr{Endpoint: testutil.GetAvailableLocalAddress(t)},
TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 5 * time.Second},
},
exportertest.NewNopCreateSettings())
require.NoError(t, err)
require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost()))
require.Error(t, exp.ConsumeMetrics(context.Background(), generateLargeBatch()))
require.NoError(t, exp.Shutdown(context.Background()))
}

func TestConsumeMetrics(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("skipping test on windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10147")
}

tests := []struct {
name string
md pmetric.Metrics
acceptClient bool
createServer bool
name string
md pmetric.Metrics
numProducers int
writesPerProducer int
}{
{
name: "small_batch",
md: smallBatch,
name: "small_batch",
md: generateSmallBatch(),
numProducers: 1,
writesPerProducer: 5,
},
{
name: "small_batch",
md: smallBatch,
createServer: true,
name: "large_batch",
md: generateLargeBatch(),
numProducers: 1,
writesPerProducer: 5,
},
{
name: "small_batch",
md: smallBatch,
createServer: true,
acceptClient: true,
name: "concurrent_small_batch",
md: generateSmallBatch(),
numProducers: 5,
writesPerProducer: 5,
},
{
name: "large_batch",
md: largeBatch,
name: "concurrent_large_batch",
md: generateLargeBatch(),
numProducers: 5,
writesPerProducer: 5,
},
{
name: "large_batch",
md: largeBatch,
createServer: true,
},
{
name: "large_batch",
md: largeBatch,
createServer: true,
acceptClient: true,
name: "high_concurrency",
md: generateLargeBatch(),
numProducers: 10,
writesPerProducer: 200,
},
}
for _, tt := range tests {
testName := fmt.Sprintf(
"%s_createServer_%t_acceptClient_%t", tt.name, tt.createServer, tt.acceptClient)
t.Run(testName, func(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
var ln *net.TCPListener
if tt.createServer {
laddr, err := net.ResolveTCPAddr("tcp", addr)
require.NoError(t, err)
ln, err = net.ListenTCP("tcp", laddr)
require.NoError(t, err)
defer ln.Close()
}

config := &Config{TCPAddr: confignet.TCPAddr{Endpoint: addr}, TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 1000 * time.Millisecond}}
exp, err := newCarbonExporter(config, exportertest.NewNopCreateSettings())
cs := newCarbonServer(t, addr)
// Each metric point will generate one Carbon line, set up the wait
// for all of them.
cs.start(t, tt.numProducers*tt.writesPerProducer*tt.md.DataPointCount())

exp, err := newCarbonExporter(
&Config{
TCPAddr: confignet.TCPAddr{Endpoint: addr},
TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 5 * time.Second},
},
exportertest.NewNopCreateSettings())
require.NoError(t, err)

require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost()))

if !tt.createServer {
require.Error(t, exp.ConsumeMetrics(context.Background(), tt.md))
assert.NoError(t, exp.Shutdown(context.Background()))
return
startCh := make(chan struct{})
var writersWG sync.WaitGroup
writersWG.Add(tt.numProducers)
for i := 0; i < tt.numProducers; i++ {
go func() {
defer writersWG.Done()
<-startCh
for j := 0; j < tt.writesPerProducer; j++ {
require.NoError(t, exp.ConsumeMetrics(context.Background(), tt.md))
}
}()
}

if !tt.acceptClient {
// Due to differences between platforms is not certain if the call to ConsumeMetrics below will produce error or not.
// See comment about recvfrom at connPool.Write for detailed information.
_ = exp.ConsumeMetrics(context.Background(), tt.md)
assert.NoError(t, exp.Shutdown(context.Background()))
return
}
// Release all senders.
close(startCh)
// Wait for all senders to finish.
writersWG.Wait()

// Each metric point will generate one Carbon line, set up the wait
// for all of them.
var wg sync.WaitGroup
wg.Add(tt.md.DataPointCount())
go func() {
assert.NoError(t, ln.SetDeadline(time.Now().Add(time.Second)))
conn, err := ln.AcceptTCP()
require.NoError(t, err)
defer conn.Close()
assert.NoError(t, exp.Shutdown(context.Background()))
cs.shutdownAndVerify(t)
})
}
}

reader := bufio.NewReader(conn)
for {
// Actual metric validation is done by other tests, here it
// is just flow.
_, err := reader.ReadBytes(byte('\n'))
if err != nil && !errors.Is(err, io.EOF) {
assert.NoError(t, err) // Just to print any error
}
func generateSmallBatch() pmetric.Metrics {
metrics := pmetric.NewMetrics()
m := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
m.SetName("test_gauge")
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
dp.Attributes().PutStr("k0", "v0")
dp.Attributes().PutStr("k1", "v1")
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dp.SetDoubleValue(123)
return metrics
}

if errors.Is(err, io.EOF) {
break
}
wg.Done()
}
}()
func generateLargeBatch() pmetric.Metrics {
ts := time.Now()
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
rm.Resource().Attributes().PutStr(conventions.AttributeServiceName, "test_carbon")
ms := rm.ScopeMetrics().AppendEmpty().Metrics()

<-time.After(100 * time.Millisecond)
for i := 0; i < 1028; i++ {
m := ms.AppendEmpty()
m.SetName("test_" + strconv.Itoa(i))
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
dp.Attributes().PutStr("k0", "v0")
dp.Attributes().PutStr("k1", "v1")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.SetIntValue(int64(i))
}

require.NoError(t, exp.ConsumeMetrics(context.Background(), tt.md))
assert.NoError(t, exp.Shutdown(context.Background()))
return metrics
}

wg.Wait()
})
}
type carbonServer struct {
ln *net.TCPListener
doneServer *atomic.Bool
wg sync.WaitGroup
}

// Other tests didn't for the concurrency aspect of connPool, this test
// is designed to force that.
func Test_connPool_Concurrency(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("skipping test on windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10147")
}
addr := testutil.GetAvailableLocalAddress(t)
func newCarbonServer(t *testing.T, addr string) *carbonServer {
laddr, err := net.ResolveTCPAddr("tcp", addr)
require.NoError(t, err)
ln, err := net.ListenTCP("tcp", laddr)
require.NoError(t, err)
defer ln.Close()

startCh := make(chan struct{})

cp := newTCPConnPool(addr, 500*time.Millisecond)
sender := carbonSender{connPool: cp}
ctx := context.Background()
md := generateLargeBatch()
concurrentWriters := 3
writesPerRoutine := 3

doneFlag := &atomic.Bool{}
defer func() {
doneFlag.Store(true)
}()
return &carbonServer{
ln: ln,
doneServer: &atomic.Bool{},
}
}

var recvWG sync.WaitGroup
recvWG.Add(concurrentWriters * writesPerRoutine * md.MetricCount())
func (cs *carbonServer) start(t *testing.T, numExpectedReq int) {
cs.wg.Add(numExpectedReq)
go func() {
for {
conn, err := ln.AcceptTCP()
if doneFlag.Load() {
conn, err := cs.ln.Accept()
if cs.doneServer.Load() {
// Close is expected to cause error.
return
}
require.NoError(t, err)
go func(conn *net.TCPConn) {
defer conn.Close()
go func(conn net.Conn) {
defer func() {
require.NoError(t, conn.Close())
}()

reader := bufio.NewReader(conn)
for {
// Actual metric validation is done by other tests, here it
// is just flow.
_, err := reader.ReadBytes(byte('\n'))
if err != nil && !errors.Is(err, io.EOF) {
assert.NoError(t, err) // Just to print any error
}

if errors.Is(err, io.EOF) {
break
return
}
recvWG.Done()
require.NoError(t, err)

cs.wg.Done()
}
}(conn)
}
}()

var writersWG sync.WaitGroup
for i := 0; i < concurrentWriters; i++ {
writersWG.Add(1)
go func() {
<-startCh
for i := 0; i < writesPerRoutine; i++ {
assert.NoError(t, sender.pushMetricsData(ctx, md))
}
writersWG.Done()
}()
}

close(startCh) // Release all workers
writersWG.Wait()
assert.NoError(t, sender.Shutdown(context.Background()))

recvWG.Wait()
<-time.After(100 * time.Millisecond)
}

func generateLargeBatch() pmetric.Metrics {
ts := time.Now()
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
rm.Resource().Attributes().PutStr(conventions.AttributeServiceName, "test_carbon")
ms := rm.ScopeMetrics().AppendEmpty().Metrics()

for i := 0; i < 65000; i++ {
m := ms.AppendEmpty()
m.SetName("test_" + strconv.Itoa(i))
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
dp.Attributes().PutStr("k0", "v0")
dp.Attributes().PutStr("k1", "v1")
dp.SetTimestamp(pcommon.NewTimestampFromTime(ts))
dp.SetIntValue(int64(i))
}

return metrics
func (cs *carbonServer) shutdownAndVerify(t *testing.T) {
cs.wg.Wait()
cs.doneServer.Store(true)
require.NoError(t, cs.ln.Close())
}

0 comments on commit c3300dd

Please sign in to comment.