Skip to content

Commit

Permalink
[exporter/elasticsearch] support both v7 & v8
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Jan 16, 2025
1 parent 0788185 commit d4af93a
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 28 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter-goelasticsearchv8.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Use go-elasticsearch/v8 by default, add config to fall back to v7

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32454]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
15 changes: 15 additions & 0 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,21 @@ The Elasticsearch Exporter's own telemetry settings for testing and debugging pu
- `log_request_body` (default=false): Logs Elasticsearch client request body as a field in a log line at DEBUG level. It requires `service::telemetry::logs::level` to be set to `debug`. WARNING: Enabling this config may expose sensitive data.
- `log_response_body` (default=false): Logs Elasticsearch client response body as a field in a log line at DEBUG level. It requires `service::telemetry::logs::level` to be set to `debug`. WARNING: Enabling this config may expose sensitive data.

### Elasticsearch version compatibility

The Elasticsearch Exporter uses the [go-elasticsearch](https://github.com/elastic/go-elasticsearch)
client for communicating with Elasticsearch, and has forward compatibility with Elasticsearch 8+ by
default. It is possible to enable best-effort support for older Elasticsearch 7.x versions by
setting the Elasticsearch exporter config `version` to `7`.

Certain features of the exporter, such as the `otel` mapping mode`, may require newer versions of
Elasticsearch. In general it is recommended to use the exporter with the most recent supported,
as this will have the most in-depth testing.

> [!NOTE]
> See https://www.elastic.co/support/eol for Elasticsearch's End Of Life policy.
> Versions prior to 7.17.x are no longer supported by Elastic.

## Exporting metrics

Metrics support is currently in development.
Expand Down
10 changes: 5 additions & 5 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"time"

"github.com/elastic/go-docappender/v2"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v8/esapi"
"go.opentelemetry.io/collector/config/configcompression"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -55,14 +55,14 @@ type bulkIndexerSession interface {

const defaultMaxRetries = 2

func newBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (bulkIndexer, error) {
func newBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config) (bulkIndexer, error) {
if config.Batcher.Enabled != nil {
return newSyncBulkIndexer(logger, client, config), nil
}
return newAsyncBulkIndexer(logger, client, config)
}

func bulkIndexerConfig(client *elasticsearch.Client, config *Config) docappender.BulkIndexerConfig {
func bulkIndexerConfig(client esapi.Transport, config *Config) docappender.BulkIndexerConfig {
var maxDocRetries int
if config.Retry.Enabled {
maxDocRetries = defaultMaxRetries
Expand All @@ -84,7 +84,7 @@ func bulkIndexerConfig(client *elasticsearch.Client, config *Config) docappender
}
}

func newSyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) *syncBulkIndexer {
func newSyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config) *syncBulkIndexer {
return &syncBulkIndexer{
config: bulkIndexerConfig(client, config),
flushTimeout: config.Timeout,
Expand Down Expand Up @@ -175,7 +175,7 @@ func (s *syncBulkIndexerSession) Flush(ctx context.Context) error {
}
}

func newAsyncBulkIndexer(logger *zap.Logger, client *elasticsearch.Client, config *Config) (*asyncBulkIndexer, error) {
func newAsyncBulkIndexer(logger *zap.Logger, client esapi.Transport, config *Config) (*asyncBulkIndexer, error) {
numWorkers := config.NumWorkers
if numWorkers == 0 {
numWorkers = runtime.NumCPU()
Expand Down
17 changes: 7 additions & 10 deletions exporter/elasticsearchexporter/bulkindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"testing"
"time"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v8"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config/confighttp"
Expand Down Expand Up @@ -293,15 +293,11 @@ func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) {
runBulkIndexerOnce(t, &tt.config, client)

records := logObserver.AllUntimed()
assert.Len(t, records, 2)
require.Len(t, records, 1)

assert.Equal(t, "/", records[0].ContextMap()["path"])
assert.Nil(t, records[0].ContextMap()["request_body"])
assert.Equal(t, "/_bulk", records[0].ContextMap()["path"])
assert.Equal(t, "{\"create\":{\"_index\":\"foo\"}}\n{\"foo\": \"bar\"}\n", records[0].ContextMap()["request_body"])
assert.JSONEq(t, successResp, records[0].ContextMap()["response_body"].(string))

assert.Equal(t, "/_bulk", records[1].ContextMap()["path"])
assert.Equal(t, "{\"create\":{\"_index\":\"foo\"}}\n{\"foo\": \"bar\"}\n", records[1].ContextMap()["request_body"])
assert.JSONEq(t, successResp, records[1].ContextMap()["response_body"].(string))
})
}
}
Expand All @@ -327,8 +323,9 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) {
reqCnt.Add(1)
}
return &http.Response{
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader(successResp)),
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader(successResp)),
StatusCode: http.StatusOK,
}, nil
},
}})
Expand Down
9 changes: 9 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ type Config struct {
// If Batcher.Enabled is non-nil (i.e. batcher::enabled is specified),
// then the Flush will be ignored even if Batcher.Enabled is false.
Batcher BatcherConfig `mapstructure:"batcher"`

// Version holds the major version of Elasticsearch that the exporter
// will target: 7 or 8.
Version int `mapstructure:"version"`
}

// BatcherConfig holds configuration for exporterbatcher.
Expand Down Expand Up @@ -279,6 +283,11 @@ func (cfg *Config) Validate() error {
if cfg.Retry.MaxRetries < 0 {
return errors.New("retry::max_retries should be non-negative")
}
switch cfg.Version {
case 7, 8:
default:
return fmt.Errorf("version must be 7 or 8, got %d", cfg.Version)
}

return nil
}
Expand Down
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func TestConfig(t *testing.T) {
MaxSizeItems: 0,
},
},
Version: 8,
},
},
{
Expand Down Expand Up @@ -191,6 +192,7 @@ func TestConfig(t *testing.T) {
MaxSizeItems: 0,
},
},
Version: 8,
},
},
{
Expand Down Expand Up @@ -262,6 +264,7 @@ func TestConfig(t *testing.T) {
MaxSizeItems: 0,
},
},
Version: 8,
},
},
{
Expand Down
81 changes: 69 additions & 12 deletions exporter/elasticsearchexporter/esclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/elastic/go-elasticsearch/v7"
elasticsearchv7 "github.com/elastic/go-elasticsearch/v7"
elasticsearchv8 "github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/klauspost/compress/gzip"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
Expand Down Expand Up @@ -82,14 +86,14 @@ func (cl *clientLogger) ResponseBodyEnabled() bool {
return cl.logResponseBody
}

// newElasticsearchClient returns a new elasticsearch.Client
// newElasticsearchClient returns a new esapi.Transport.
func newElasticsearchClient(
ctx context.Context,
config *Config,
host component.Host,
telemetry component.TelemetrySettings,
userAgent string,
) (*elasticsearch.Client, error) {
) (esapi.Transport, error) {
httpClient, err := config.ClientConfig.ToClient(ctx, host, telemetry)
if err != nil {
return nil, err
Expand All @@ -105,19 +109,36 @@ func newElasticsearchClient(
return nil, err
}

esLogger := clientLogger{
esLogger := &clientLogger{
Logger: telemetry.Logger,
logRequestBody: config.LogRequestBody,
logResponseBody: config.LogResponseBody,
}

maxRetries := defaultMaxRetries
if config.Retry.MaxRetries != 0 {
maxRetries = config.Retry.MaxRetries
switch config.Version {
case 7:
return newElasticsearchClientV7(
config, endpoints, headers,
httpClient.Transport, esLogger,
)
case 8:
return newElasticsearchClientV8(
config, endpoints, headers,
httpClient.Transport, esLogger,
)
}
return nil, fmt.Errorf("unsupported version %d", config.Version)
}

return elasticsearch.NewClient(elasticsearch.Config{
Transport: httpClient.Transport,
func newElasticsearchClientV7(
config *Config,
endpoints []string,
headers http.Header,
httpTransport http.RoundTripper,
esLogger *clientLogger,
) (esapi.Transport, error) {
return elasticsearchv7.NewClient(elasticsearchv7.Config{
Transport: httpTransport,

// configure connection setup
Addresses: endpoints,
Expand All @@ -130,8 +151,44 @@ func newElasticsearchClient(
RetryOnStatus: config.Retry.RetryOnStatus,
DisableRetry: !config.Retry.Enabled,
EnableRetryOnTimeout: config.Retry.Enabled,
// RetryOnError: retryOnError, // should be used from esclient version 8 onwards
MaxRetries: maxRetries,
MaxRetries: min(defaultMaxRetries, config.Retry.MaxRetries),
RetryBackoff: createElasticsearchBackoffFunc(&config.Retry),

// configure sniffing
DiscoverNodesOnStart: config.Discovery.OnStart,
DiscoverNodesInterval: config.Discovery.Interval,

// configure internal metrics reporting and logging
EnableMetrics: false, // TODO
EnableDebugLogger: false, // TODO
Logger: esLogger,
})
}

func newElasticsearchClientV8(
config *Config,
endpoints []string,
headers http.Header,
httpTransport http.RoundTripper,
esLogger *clientLogger,
) (*elasticsearchv8.Client, error) {
return elasticsearchv8.NewClient(elasticsearchv8.Config{
Transport: httpTransport,

// configure connection setup
Addresses: endpoints,
Username: config.Authentication.User,
Password: string(config.Authentication.Password),
APIKey: string(config.Authentication.APIKey),
Header: headers,

// configure retry behavior
RetryOnStatus: config.Retry.RetryOnStatus,
DisableRetry: !config.Retry.Enabled,
RetryOnError: func(_ *http.Request, err error) bool {
return !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded)
},
MaxRetries: min(defaultMaxRetries, config.Retry.MaxRetries),
RetryBackoff: createElasticsearchBackoffFunc(&config.Retry),

// configure sniffing
Expand All @@ -141,7 +198,7 @@ func newElasticsearchClient(
// configure internal metrics reporting and logging
EnableMetrics: false, // TODO
EnableDebugLogger: false, // TODO
Logger: &esLogger,
Logger: esLogger,
})
}

Expand Down
1 change: 1 addition & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1768,6 +1768,7 @@ func TestExporterBatcher(t *testing.T) {
exporter := newUnstartedTestLogsExporter(t, "http://testing.invalid", func(cfg *Config) {
cfg.Batcher = BatcherConfig{Enabled: &batcherEnabled}
cfg.Auth = &configauth.Authentication{AuthenticatorID: testauthID}
cfg.Retry.Enabled = false
})
err := exporter.Start(context.Background(), &mockHost{
extensions: map[component.ID]component.Component{
Expand Down
1 change: 1 addition & 0 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func createDefaultConfig() component.Config {
Bytes: 5e+6,
Interval: 30 * time.Second,
},
Version: 8,
}
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/elastic/go-docappender/v2 v2.3.3
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/elastic/go-elasticsearch/v8 v8.17.0
github.com/elastic/go-structform v0.0.12
github.com/klauspost/compress v1.17.11
github.com/lestrrat-go/strftime v1.1.0
Expand Down Expand Up @@ -34,7 +35,6 @@ require (
github.com/armon/go-radix v1.0.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/elastic/go-elasticsearch/v8 v8.17.0 // indirect
github.com/elastic/go-sysinfo v1.7.1 // indirect
github.com/elastic/go-windows v1.0.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand Down

0 comments on commit d4af93a

Please sign in to comment.