Skip to content

Commit

Permalink
Add ability to configure maxIdleConnection for CarbonExporter (#30109)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Dec 19, 2023
1 parent 41a0629 commit 2a7c165
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 89 deletions.
22 changes: 22 additions & 0 deletions .chloggen/carbonexportermax.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: carbonexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add ability to configure max_idle_conns"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30109]

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
14 changes: 8 additions & 6 deletions exporter/carbonexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
)

// Defaults for not specified configuration settings.
const (
defaultEndpoint = "localhost:2003"
)

// Config defines configuration for Carbon exporter.
type Config struct {
// Specifies the connection endpoint config. The default value is "localhost:2003".
confignet.TCPAddr `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
// MaxIdleConns is used to set a limit to the maximum idle TCP connections the client can keep open. Default value is 100.
// If `sending_queue` is enabled, it is recommended to use same value as `sending_queue::num_consumers`.
MaxIdleConns int `mapstructure:"max_idle_conns"`

// Timeout is the maximum duration allowed to connecting and sending the
// data to the Carbon/Graphite backend. The default value is 5s.
Expand All @@ -43,7 +41,11 @@ func (cfg *Config) Validate() error {

// Negative timeouts are not acceptable, since all sends will fail.
if cfg.Timeout < 0 {
return errors.New("exporter requires a positive timeout")
return errors.New("'timeout' must be non-negative")
}

if cfg.MaxIdleConns < 0 {
return errors.New("'max_idle_conns' must be non-negative")
}

return nil
Expand Down
10 changes: 10 additions & 0 deletions exporter/carbonexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestLoadConfig(t *testing.T) {
TCPAddr: confignet.TCPAddr{
Endpoint: "localhost:8080",
},
MaxIdleConns: 15,
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 10 * time.Second,
},
Expand Down Expand Up @@ -101,12 +102,21 @@ func TestValidateConfig(t *testing.T) {
{
name: "invalid_timeout",
config: &Config{
TCPAddr: confignet.TCPAddr{Endpoint: defaultEndpoint},
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: -5 * time.Second,
},
},
wantErr: true,
},
{
name: "invalid_max_idle_conns",
config: &Config{
TCPAddr: confignet.TCPAddr{Endpoint: defaultEndpoint},
MaxIdleConns: -1,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
195 changes: 116 additions & 79 deletions exporter/carbonexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ package carbonexporter // import "github.com/open-telemetry/opentelemetry-collec

import (
"context"
"io"
"net"
"sync"
"time"

"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -21,7 +21,8 @@ import (
// newCarbonExporter returns a new Carbon exporter.
func newCarbonExporter(ctx context.Context, cfg *Config, set exporter.CreateSettings) (exporter.Metrics, error) {
sender := carbonSender{
writer: newTCPConnPool(cfg.Endpoint, cfg.Timeout),
writeTimeout: cfg.Timeout,
conns: newConnPool(cfg.TCPAddr, cfg.Timeout, cfg.MaxIdleConns),
}

exp, err := exporterhelper.NewMetricsExporter(
Expand All @@ -44,80 +45,13 @@ func newCarbonExporter(ctx context.Context, cfg *Config, set exporter.CreateSett
// connections into an implementations of exporterhelper.PushMetricsData so
// the exporter can leverage the helper and get consistent observability.
type carbonSender struct {
writer io.WriteCloser
writeTimeout time.Duration
conns connPool
}

func (cs *carbonSender) pushMetricsData(_ context.Context, md pmetric.Metrics) error {
lines := metricDataToPlaintext(md)

if _, err := cs.writer.Write([]byte(lines)); err != nil {
// Use the sum of converted and dropped since the write failed for all.
return err
}

return nil
}

func (cs *carbonSender) Shutdown(context.Context) error {
return cs.writer.Close()
}

// connPool is a very simple implementation of a pool of net.TCPConn instances.
// The implementation hides the pool and exposes a Write and Close methods.
// It leverages the prior art from SignalFx Gateway (see
// https://github.com/signalfx/gateway/blob/master/protocol/carbon/conn_pool.go
// but not its implementation).
//
// It keeps an unbounded "stack" of TCPConn instances always "popping" the most
// recently returned to the pool. There is no accounting to terminating old
// unused connections as that was the case on the prior art mentioned above.
type connPool struct {
mtx sync.Mutex
conns []net.Conn
endpoint string
timeout time.Duration
}

func newTCPConnPool(
endpoint string,
timeout time.Duration,
) io.WriteCloser {
return &connPool{
endpoint: endpoint,
timeout: timeout,
}
}

func (cp *connPool) Write(bytes []byte) (int, error) {
var conn net.Conn
var err error

start := time.Now()
cp.mtx.Lock()
lastIdx := len(cp.conns) - 1
if lastIdx >= 0 {
conn = cp.conns[lastIdx]
cp.conns = cp.conns[0:lastIdx]
}
cp.mtx.Unlock()
if conn == nil {
if conn, err = cp.createTCPConn(); err != nil {
return 0, err
}
}

// The deferred function below is what puts back connections on the pool if no error.
defer func() {
if err != nil {
// err already not nil, so will not influence retry logic because of the connection close.
err = multierr.Append(err, conn.Close())
return
}
cp.mtx.Lock()
cp.conns = append(cp.conns, conn)
cp.mtx.Unlock()
}()

// There is no way to do a call equivalent to recvfrom with an empty buffer
// to check if the connection was terminated (if the size of the buffer is
// 0 the Read call doesn't call lower level). So due to buffer sizes it is
Expand All @@ -136,17 +70,120 @@ func (cp *connPool) Write(bytes []byte) (int, error) {
// facts this "workaround" is not being added at this moment. If it is
// needed in some scenarios the workaround should be validated on other
// platforms and offered as a configuration setting.
conn, err := cs.conns.get()
if err != nil {
return err
}

if err = conn.SetWriteDeadline(time.Now().Add(cs.writeTimeout)); err != nil {
// Do not re-enqueue the connection since it failed to set a deadline.
return multierr.Append(err, conn.Close())
}

// If we did not write all bytes will get an error, so no need to check for that.
_, err = conn.Write([]byte(lines))
if err != nil {
// Do not re-enqueue the connection since it failed to write.
return multierr.Append(err, conn.Close())
}

// Even if we close the connection because of the max idle connections,
cs.conns.put(conn)
return nil
}

func (cs *carbonSender) Shutdown(context.Context) error {
return cs.conns.close()
}

// connPool is a very simple implementation of a pool of net.Conn instances.
type connPool interface {
get() (net.Conn, error)
put(conn net.Conn)
close() error
}

func newConnPool(
tcpConfig confignet.TCPAddr,
timeout time.Duration,
maxIdleConns int,
) connPool {
if maxIdleConns == 0 {
return &nopConnPool{
timeout: timeout,
tcpConfig: tcpConfig,
}
}
return &connPoolWithIdle{
timeout: timeout,
tcpConfig: tcpConfig,
maxIdleConns: maxIdleConns,
}
}

// nopConnPool is a very simple implementation that does not cache any net.Conn.
type nopConnPool struct {
timeout time.Duration
tcpConfig confignet.TCPAddr
}

func (cp *nopConnPool) get() (net.Conn, error) {
return createTCPConn(cp.tcpConfig, cp.timeout)
}

func (cp *nopConnPool) put(conn net.Conn) {
_ = conn.Close()
}

if err = conn.SetWriteDeadline(start.Add(cp.timeout)); err != nil {
return 0, err
func (cp *nopConnPool) close() error {
return nil
}

// connPool is a very simple implementation of a pool of net.Conn instances.
//
// It keeps at most maxIdleConns net.Conn and always "popping" the most
// recently returned to the pool. There is no accounting to terminating old
// unused connections.
type connPoolWithIdle struct {
timeout time.Duration
maxIdleConns int
mtx sync.Mutex
conns []net.Conn
tcpConfig confignet.TCPAddr
}

func (cp *connPoolWithIdle) get() (net.Conn, error) {
if conn := cp.getFromCache(); conn != nil {
return conn, nil
}

return createTCPConn(cp.tcpConfig, cp.timeout)
}

func (cp *connPoolWithIdle) put(conn net.Conn) {
cp.mtx.Lock()
defer cp.mtx.Unlock()
// Do not cache if above limit.
if len(cp.conns) > cp.maxIdleConns {
_ = conn.Close()
return
}
cp.conns = append(cp.conns, conn)
}

var n int
n, err = conn.Write(bytes)
return n, err
func (cp *connPoolWithIdle) getFromCache() net.Conn {
cp.mtx.Lock()
defer cp.mtx.Unlock()
lastIdx := len(cp.conns) - 1
if lastIdx < 0 {
return nil
}
conn := cp.conns[lastIdx]
cp.conns = cp.conns[0:lastIdx]
return conn
}

func (cp *connPool) Close() error {
func (cp *connPoolWithIdle) close() error {
cp.mtx.Lock()
defer cp.mtx.Unlock()

Expand All @@ -158,8 +195,8 @@ func (cp *connPool) Close() error {
return errs
}

func (cp *connPool) createTCPConn() (net.Conn, error) {
c, err := net.DialTimeout("tcp", cp.endpoint, cp.timeout)
func createTCPConn(tcpConfig confignet.TCPAddr, timeout time.Duration) (net.Conn, error) {
c, err := net.DialTimeout("tcp", tcpConfig.Endpoint, timeout)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 2a7c165

Please sign in to comment.