From 2a7c16599c5a0161b5a950987dd2b2ba09703604 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Tue, 19 Dec 2023 13:26:09 -0800 Subject: [PATCH] Add ability to configure maxIdleConnection for CarbonExporter (#30109) Signed-off-by: Bogdan Drutu --- .chloggen/carbonexportermax.yaml | 22 +++ exporter/carbonexporter/config.go | 14 +- exporter/carbonexporter/config_test.go | 10 + exporter/carbonexporter/exporter.go | 195 +++++++++++-------- exporter/carbonexporter/exporter_test.go | 126 +++++++++++- exporter/carbonexporter/factory.go | 6 + exporter/carbonexporter/testdata/config.yaml | 1 + 7 files changed, 285 insertions(+), 89 deletions(-) create mode 100755 .chloggen/carbonexportermax.yaml diff --git a/.chloggen/carbonexportermax.yaml b/.chloggen/carbonexportermax.yaml new file mode 100755 index 000000000000..7c4510e620d8 --- /dev/null +++ b/.chloggen/carbonexportermax.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: carbonexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add ability to configure max_idle_conns" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30109] + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/carbonexporter/config.go b/exporter/carbonexporter/config.go index 535cae47d4d9..90ed96f719be 100644 --- a/exporter/carbonexporter/config.go +++ b/exporter/carbonexporter/config.go @@ -14,15 +14,13 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry" ) -// Defaults for not specified configuration settings. -const ( - defaultEndpoint = "localhost:2003" -) - // Config defines configuration for Carbon exporter. type Config struct { // Specifies the connection endpoint config. The default value is "localhost:2003". confignet.TCPAddr `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + // MaxIdleConns is used to set a limit to the maximum idle TCP connections the client can keep open. Default value is 100. + // If `sending_queue` is enabled, it is recommended to use same value as `sending_queue::num_consumers`. + MaxIdleConns int `mapstructure:"max_idle_conns"` // Timeout is the maximum duration allowed to connecting and sending the // data to the Carbon/Graphite backend. The default value is 5s. @@ -43,7 +41,11 @@ func (cfg *Config) Validate() error { // Negative timeouts are not acceptable, since all sends will fail. if cfg.Timeout < 0 { - return errors.New("exporter requires a positive timeout") + return errors.New("'timeout' must be non-negative") + } + + if cfg.MaxIdleConns < 0 { + return errors.New("'max_idle_conns' must be non-negative") } return nil diff --git a/exporter/carbonexporter/config_test.go b/exporter/carbonexporter/config_test.go index 8a41cd8445f1..1bf4dcc7c7be 100644 --- a/exporter/carbonexporter/config_test.go +++ b/exporter/carbonexporter/config_test.go @@ -41,6 +41,7 @@ func TestLoadConfig(t *testing.T) { TCPAddr: confignet.TCPAddr{ Endpoint: "localhost:8080", }, + MaxIdleConns: 15, TimeoutSettings: exporterhelper.TimeoutSettings{ Timeout: 10 * time.Second, }, @@ -101,12 +102,21 @@ func TestValidateConfig(t *testing.T) { { name: "invalid_timeout", config: &Config{ + TCPAddr: confignet.TCPAddr{Endpoint: defaultEndpoint}, TimeoutSettings: exporterhelper.TimeoutSettings{ Timeout: -5 * time.Second, }, }, wantErr: true, }, + { + name: "invalid_max_idle_conns", + config: &Config{ + TCPAddr: confignet.TCPAddr{Endpoint: defaultEndpoint}, + MaxIdleConns: -1, + }, + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/exporter/carbonexporter/exporter.go b/exporter/carbonexporter/exporter.go index 60026b6141b7..3414833e6e8f 100644 --- a/exporter/carbonexporter/exporter.go +++ b/exporter/carbonexporter/exporter.go @@ -5,11 +5,11 @@ package carbonexporter // import "github.com/open-telemetry/opentelemetry-collec import ( "context" - "io" "net" "sync" "time" + "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/pmetric" @@ -21,7 +21,8 @@ import ( // newCarbonExporter returns a new Carbon exporter. func newCarbonExporter(ctx context.Context, cfg *Config, set exporter.CreateSettings) (exporter.Metrics, error) { sender := carbonSender{ - writer: newTCPConnPool(cfg.Endpoint, cfg.Timeout), + writeTimeout: cfg.Timeout, + conns: newConnPool(cfg.TCPAddr, cfg.Timeout, cfg.MaxIdleConns), } exp, err := exporterhelper.NewMetricsExporter( @@ -44,80 +45,13 @@ func newCarbonExporter(ctx context.Context, cfg *Config, set exporter.CreateSett // connections into an implementations of exporterhelper.PushMetricsData so // the exporter can leverage the helper and get consistent observability. type carbonSender struct { - writer io.WriteCloser + writeTimeout time.Duration + conns connPool } func (cs *carbonSender) pushMetricsData(_ context.Context, md pmetric.Metrics) error { lines := metricDataToPlaintext(md) - if _, err := cs.writer.Write([]byte(lines)); err != nil { - // Use the sum of converted and dropped since the write failed for all. - return err - } - - return nil -} - -func (cs *carbonSender) Shutdown(context.Context) error { - return cs.writer.Close() -} - -// connPool is a very simple implementation of a pool of net.TCPConn instances. -// The implementation hides the pool and exposes a Write and Close methods. -// It leverages the prior art from SignalFx Gateway (see -// https://github.com/signalfx/gateway/blob/master/protocol/carbon/conn_pool.go -// but not its implementation). -// -// It keeps an unbounded "stack" of TCPConn instances always "popping" the most -// recently returned to the pool. There is no accounting to terminating old -// unused connections as that was the case on the prior art mentioned above. -type connPool struct { - mtx sync.Mutex - conns []net.Conn - endpoint string - timeout time.Duration -} - -func newTCPConnPool( - endpoint string, - timeout time.Duration, -) io.WriteCloser { - return &connPool{ - endpoint: endpoint, - timeout: timeout, - } -} - -func (cp *connPool) Write(bytes []byte) (int, error) { - var conn net.Conn - var err error - - start := time.Now() - cp.mtx.Lock() - lastIdx := len(cp.conns) - 1 - if lastIdx >= 0 { - conn = cp.conns[lastIdx] - cp.conns = cp.conns[0:lastIdx] - } - cp.mtx.Unlock() - if conn == nil { - if conn, err = cp.createTCPConn(); err != nil { - return 0, err - } - } - - // The deferred function below is what puts back connections on the pool if no error. - defer func() { - if err != nil { - // err already not nil, so will not influence retry logic because of the connection close. - err = multierr.Append(err, conn.Close()) - return - } - cp.mtx.Lock() - cp.conns = append(cp.conns, conn) - cp.mtx.Unlock() - }() - // There is no way to do a call equivalent to recvfrom with an empty buffer // to check if the connection was terminated (if the size of the buffer is // 0 the Read call doesn't call lower level). So due to buffer sizes it is @@ -136,17 +70,120 @@ func (cp *connPool) Write(bytes []byte) (int, error) { // facts this "workaround" is not being added at this moment. If it is // needed in some scenarios the workaround should be validated on other // platforms and offered as a configuration setting. + conn, err := cs.conns.get() + if err != nil { + return err + } + + if err = conn.SetWriteDeadline(time.Now().Add(cs.writeTimeout)); err != nil { + // Do not re-enqueue the connection since it failed to set a deadline. + return multierr.Append(err, conn.Close()) + } + + // If we did not write all bytes will get an error, so no need to check for that. + _, err = conn.Write([]byte(lines)) + if err != nil { + // Do not re-enqueue the connection since it failed to write. + return multierr.Append(err, conn.Close()) + } + + // Even if we close the connection because of the max idle connections, + cs.conns.put(conn) + return nil +} + +func (cs *carbonSender) Shutdown(context.Context) error { + return cs.conns.close() +} + +// connPool is a very simple implementation of a pool of net.Conn instances. +type connPool interface { + get() (net.Conn, error) + put(conn net.Conn) + close() error +} + +func newConnPool( + tcpConfig confignet.TCPAddr, + timeout time.Duration, + maxIdleConns int, +) connPool { + if maxIdleConns == 0 { + return &nopConnPool{ + timeout: timeout, + tcpConfig: tcpConfig, + } + } + return &connPoolWithIdle{ + timeout: timeout, + tcpConfig: tcpConfig, + maxIdleConns: maxIdleConns, + } +} + +// nopConnPool is a very simple implementation that does not cache any net.Conn. +type nopConnPool struct { + timeout time.Duration + tcpConfig confignet.TCPAddr +} + +func (cp *nopConnPool) get() (net.Conn, error) { + return createTCPConn(cp.tcpConfig, cp.timeout) +} + +func (cp *nopConnPool) put(conn net.Conn) { + _ = conn.Close() +} - if err = conn.SetWriteDeadline(start.Add(cp.timeout)); err != nil { - return 0, err +func (cp *nopConnPool) close() error { + return nil +} + +// connPool is a very simple implementation of a pool of net.Conn instances. +// +// It keeps at most maxIdleConns net.Conn and always "popping" the most +// recently returned to the pool. There is no accounting to terminating old +// unused connections. +type connPoolWithIdle struct { + timeout time.Duration + maxIdleConns int + mtx sync.Mutex + conns []net.Conn + tcpConfig confignet.TCPAddr +} + +func (cp *connPoolWithIdle) get() (net.Conn, error) { + if conn := cp.getFromCache(); conn != nil { + return conn, nil + } + + return createTCPConn(cp.tcpConfig, cp.timeout) +} + +func (cp *connPoolWithIdle) put(conn net.Conn) { + cp.mtx.Lock() + defer cp.mtx.Unlock() + // Do not cache if above limit. + if len(cp.conns) > cp.maxIdleConns { + _ = conn.Close() + return } + cp.conns = append(cp.conns, conn) +} - var n int - n, err = conn.Write(bytes) - return n, err +func (cp *connPoolWithIdle) getFromCache() net.Conn { + cp.mtx.Lock() + defer cp.mtx.Unlock() + lastIdx := len(cp.conns) - 1 + if lastIdx < 0 { + return nil + } + conn := cp.conns[lastIdx] + cp.conns = cp.conns[0:lastIdx] + return conn } -func (cp *connPool) Close() error { +func (cp *connPoolWithIdle) close() error { cp.mtx.Lock() defer cp.mtx.Unlock() @@ -158,8 +195,8 @@ func (cp *connPool) Close() error { return errs } -func (cp *connPool) createTCPConn() (net.Conn, error) { - c, err := net.DialTimeout("tcp", cp.endpoint, cp.timeout) +func createTCPConn(tcpConfig confignet.TCPAddr, timeout time.Duration) (net.Conn, error) { + c, err := net.DialTimeout("tcp", tcpConfig.Endpoint, timeout) if err != nil { return nil, err } diff --git a/exporter/carbonexporter/exporter_test.go b/exporter/carbonexporter/exporter_test.go index 6d1a021486b5..471423ac08e2 100644 --- a/exporter/carbonexporter/exporter_test.go +++ b/exporter/carbonexporter/exporter_test.go @@ -32,14 +32,14 @@ import ( func TestNewWithDefaultConfig(t *testing.T) { cfg := createDefaultConfig().(*Config) - got, err := newCarbonExporter(context.TODO(), cfg, exportertest.NewNopCreateSettings()) + got, err := newCarbonExporter(context.Background(), cfg, exportertest.NewNopCreateSettings()) assert.NotNil(t, got) assert.NoError(t, err) } func TestConsumeMetricsNoServer(t *testing.T) { exp, err := newCarbonExporter( - context.TODO(), + context.Background(), &Config{ TCPAddr: confignet.TCPAddr{Endpoint: testutil.GetAvailableLocalAddress(t)}, TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 5 * time.Second}, @@ -59,7 +59,7 @@ func TestConsumeMetricsWithResourceToTelemetry(t *testing.T) { cs.start(t, 1) exp, err := newCarbonExporter( - context.TODO(), + context.Background(), &Config{ TCPAddr: confignet.TCPAddr{Endpoint: addr}, TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 5 * time.Second}, @@ -124,9 +124,10 @@ func TestConsumeMetrics(t *testing.T) { cs.start(t, tt.numProducers*tt.writesPerProducer*tt.md.DataPointCount()) exp, err := newCarbonExporter( - context.TODO(), + context.Background(), &Config{ TCPAddr: confignet.TCPAddr{Endpoint: addr}, + MaxIdleConns: tt.numProducers, TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 5 * time.Second}, }, exportertest.NewNopCreateSettings()) @@ -157,6 +158,123 @@ func TestConsumeMetrics(t *testing.T) { } } +func TestNewConnectionPool(t *testing.T) { + assert.IsType(t, &nopConnPool{}, newConnPool(confignet.TCPAddr{Endpoint: defaultEndpoint}, 10*time.Second, 0)) + assert.IsType(t, &connPoolWithIdle{}, newConnPool(confignet.TCPAddr{Endpoint: defaultEndpoint}, 10*time.Second, 10)) +} + +func TestNopConnPool(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + cs := newCarbonServer(t, addr, "") + // Each metric point will generate one Carbon line, set up the wait + // for all of them. + cs.start(t, 2) + + cp := &nopConnPool{ + timeout: 1 * time.Second, + tcpConfig: confignet.TCPAddr{Endpoint: addr}, + } + + conn, err := cp.get() + require.NoError(t, err) + _, err = conn.Write([]byte(metricDataToPlaintext(generateSmallBatch()))) + assert.NoError(t, err) + cp.put(conn) + + // Get a new connection and confirm is not the same. + conn2, err2 := cp.get() + require.NoError(t, err2) + assert.NotSame(t, conn, conn2) + _, err = conn2.Write([]byte(metricDataToPlaintext(generateSmallBatch()))) + assert.NoError(t, err) + cp.put(conn2) + + require.NoError(t, cp.close()) + cs.shutdownAndVerify(t) +} + +func TestConnPoolWithIdle(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + cs := newCarbonServer(t, addr, "") + // Each metric point will generate one Carbon line, set up the wait + // for all of them. + cs.start(t, 2) + + cp := &connPoolWithIdle{ + timeout: 1 * time.Second, + tcpConfig: confignet.TCPAddr{Endpoint: addr}, + maxIdleConns: 4, + } + + conn, err := cp.get() + require.NoError(t, err) + _, err = conn.Write([]byte(metricDataToPlaintext(generateSmallBatch()))) + assert.NoError(t, err) + cp.put(conn) + + // Get a new connection and confirm it is the same as the first one. + conn2, err2 := cp.get() + require.NoError(t, err2) + assert.Same(t, conn, conn2) + _, err = conn2.Write([]byte(metricDataToPlaintext(generateSmallBatch()))) + assert.NoError(t, err) + cp.put(conn2) + + require.NoError(t, cp.close()) + cs.shutdownAndVerify(t) +} + +func TestConnPoolWithIdleMaxConnections(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + cs := newCarbonServer(t, addr, "") + const maxIdleConns = 4 + // Each metric point will generate one Carbon line, set up the wait + // for all of them. + cs.start(t, maxIdleConns+1) + + cp := &connPoolWithIdle{ + timeout: 1 * time.Second, + tcpConfig: confignet.TCPAddr{Endpoint: addr}, + maxIdleConns: maxIdleConns, + } + + // Create connections and + var conns []net.Conn + for i := 0; i < maxIdleConns; i++ { + conn, err := cp.get() + require.NoError(t, err) + conns = append(conns, conn) + if i != 0 { + assert.NotSame(t, conn, conns[i-1]) + } + + } + for _, conn := range conns { + cp.put(conn) + } + + for i := 0; i < maxIdleConns+1; i++ { + conn, err := cp.get() + require.NoError(t, err) + _, err = conn.Write([]byte(metricDataToPlaintext(generateSmallBatch()))) + assert.NoError(t, err) + if i != maxIdleConns { + assert.Same(t, conn, conns[maxIdleConns-i-1]) + } else { + // this should be a new connection + for _, cachedConn := range conns { + assert.NotSame(t, conn, cachedConn) + } + cp.put(conn) + } + } + for _, conn := range conns { + cp.put(conn) + } + require.NoError(t, cp.close()) + cs.shutdownAndVerify(t) +} + func generateSmallBatch() pmetric.Metrics { return generateMetricsBatch(1) } diff --git a/exporter/carbonexporter/factory.go b/exporter/carbonexporter/factory.go index 62a1600e00d7..a4caffe372e7 100644 --- a/exporter/carbonexporter/factory.go +++ b/exporter/carbonexporter/factory.go @@ -14,6 +14,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/carbonexporter/internal/metadata" ) +// Defaults for not specified configuration settings. +const ( + defaultEndpoint = "localhost:2003" +) + // NewFactory creates a factory for Carbon exporter. func NewFactory() exporter.Factory { return exporter.NewFactory( @@ -27,6 +32,7 @@ func createDefaultConfig() component.Config { TCPAddr: confignet.TCPAddr{ Endpoint: defaultEndpoint, }, + MaxIdleConns: 100, TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), QueueConfig: exporterhelper.NewDefaultQueueSettings(), RetryConfig: exporterhelper.NewDefaultRetrySettings(), diff --git a/exporter/carbonexporter/testdata/config.yaml b/exporter/carbonexporter/testdata/config.yaml index 7e4d6f2e4c8d..a0d330fbf595 100644 --- a/exporter/carbonexporter/testdata/config.yaml +++ b/exporter/carbonexporter/testdata/config.yaml @@ -4,6 +4,7 @@ carbon/allsettings: # use endpoint to specify alternative destinations for the exporter, # the default is localhost:2003 endpoint: localhost:8080 + max_idle_conns: 15 # timeout is the maximum duration allowed to connecting and sending the # data to the Carbon/Graphite backend. # The default is 5 seconds.