Skip to content

Commit

Permalink
[exporter/elasticsearch] Logstash format compatibility (#29625)
Browse files Browse the repository at this point in the history
**Description:** Logstash format compatibility. Traces or Logs data can
be written into an index in logstash format.
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

**Link to tracking Issue:** <Issue number if applicable>
close
#29624

**Documentation:** added some descriptions for `logstash_format `
configurations.
1. otel-col.yaml
```yaml
receivers:
  otlp:
    protocols:
      grpc:
  filelog:
    include: [ ./examples/kubernetes/varlogpods/containerd_logs-0_000011112222333344445555666677778888/logs/0.log ]
    start_at: beginning
    operators:
      # Find out which format is used by kubernetes
      - type: router
        id: get-format
        routes:
          - output: parser-docker
            expr: 'body matches "^\\{"'
          - output: parser-crio
            expr: 'body matches "^[^ Z]+ "'
          - output: parser-containerd
            expr: 'body matches "^[^ Z]+Z"'
      # Parse CRI-O format
      - type: regex_parser
        id: parser-crio
        regex: '^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$'
        output: extract_metadata_from_filepath
        timestamp:
          parse_from: attributes.time
          layout_type: gotime
          layout: '2006-01-02T15:04:05.999999999Z07:00'
      # Parse CRI-Containerd format
      - type: regex_parser
        id: parser-containerd
        regex: '^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$'
        output: extract_metadata_from_filepath
        timestamp:
          parse_from: attributes.time
          layout: '%Y-%m-%dT%H:%M:%S.%LZ'
      # Parse Docker format
      - type: json_parser
        id: parser-docker
        output: extract_metadata_from_filepath
        timestamp:
          parse_from: attributes.time
          layout: '%Y-%m-%dT%H:%M:%S.%LZ'
      # Extract metadata from file path
      - type: regex_parser
        id: extract_metadata_from_filepath
        regex: '^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]{36})\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$'
        parse_from: attributes["log.file.path"]
        cache:
          size: 128  # default maximum amount of Pods per Node is 110
      # Update body field after finishing all parsing
      - type: move
        from: attributes.log
        to: body
      # Rename attributes
      - type: move
        from: attributes.stream
        to: attributes["log.iostream"]
      - type: move
        from: attributes.container_name
        to: resource["k8s.container.name"]
      - type: move
        from: attributes.namespace
        to: resource["k8s.namespace.name"]
      - type: move
        from: attributes.pod_name
        to: resource["k8s.pod.name"]
      - type: move
        from: attributes.restart_count
        to: resource["k8s.container.restart_count"]
      - type: move
        from: attributes.uid
        to: resource["k8s.pod.uid"]
exporters:
  prometheus:
    endpoint: "0.0.0.0:8889"
    const_labels:
      label1: value1

  elasticsearch/log:
    tls:
      insecure: false
    endpoints: [http://localhost:9200]
    logs_index: otlp-logs
    logstash_format:
      enabled: true
    timeout: 2m
    flush:
      bytes: 10485760
    retry:
      max_requests: 5
    sending_queue:
      enabled: true
  elasticsearch/traces:
    tls:
      insecure: false
    endpoints: [http://localhost:9200]
    traces_index: otlp-traces
    logstash_format:
      enabled: true
    timeout: 2m
    flush:
      bytes: 10485760
    retry:
      max_requests: 5
    sending_queue:
      enabled: true

  debug:

processors:
  batch:

extensions:
  health_check:
  pprof:
    endpoint: :1888
  zpages:
    endpoint: :55679

service:
  extensions: [pprof, zpages, health_check]
  pipelines:
    logs:
      receivers: [otlp,filelog]
      processors: [batch]
      exporters: [debug, elasticsearch/log]
    traces:
      receivers: [otlp]
      processors: [batch]
      exporters: [debug, elasticsearch/traces]

```
3. es index created when `otel-col` write traces and logs:
<img width="913" alt="image"
src="https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/12468337/0ede0fd7-ed85-4fd4-b843-093c13edc1e3">

4. query index data:
<img width="743" alt="image"
src="https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/12468337/1e89a44c-cead-4aab-8b3a-284a8b573d3b">
<img width="817" alt="image"
src="https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/12468337/429c25bc-336e-4850-9d83-ed7423f38e90">

---------

Signed-off-by: Jared Tan <jian.tan@daocloud.io>
  • Loading branch information
JaredTan95 authored Dec 8, 2023
1 parent 7d614d7 commit a9d4196
Show file tree
Hide file tree
Showing 20 changed files with 350 additions and 26 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_support_logstash_index_format.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Logstash format compatibility. Traces or Logs data can be written into an index in logstash format.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29624]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ require (
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b // indirect
github.com/leoluk/perflib_exporter v0.2.1 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/lightstep/go-expohisto v1.0.0 // indirect
github.com/linkedin/goavro/v2 v2.9.8 // indirect
Expand Down
4 changes: 4 additions & 0 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ require (
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 // indirect
github.com/leoluk/perflib_exporter v0.2.1 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/lightstep/go-expohisto v1.0.0 // indirect
github.com/linkedin/goavro/v2 v2.9.8 // indirect
Expand Down
4 changes: 4 additions & 0 deletions cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ This exporter supports sending OpenTelemetry logs to [Elasticsearch](https://www
takes resource or span attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `traces_index`. (priority: resource attribute > span attribute)
- `enabled`(default=false): Enable/Disable dynamic index for trace spans
- `logstash_format` (optional): Logstash format compatibility. Traces or Logs data can be written into an index in logstash format.
- `enabled`(default=false): Enable/Disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `traces/logs_index` or `traces/logs_dynamic_index` as prefix and the date,
e.g: If `traces/logs_index` or `traces/logs_dynamic_index` is equals to `otlp-generic-default` your index will become `otlp-generic-default-YYYY.MM.DD`.
The last string appended belongs to the date when the data is being generated.
- `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date.
- `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name.
- `pipeline` (optional): Optional [Ingest Node](https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html)
pipeline ID used for processing documents published by the exporter.
- `flush`: Event bulk buffer flush settings
Expand Down
15 changes: 11 additions & 4 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,17 @@ type Config struct {
Pipeline string `mapstructure:"pipeline"`

HTTPClientSettings `mapstructure:",squash"`
Discovery DiscoverySettings `mapstructure:"discover"`
Retry RetrySettings `mapstructure:"retry"`
Flush FlushSettings `mapstructure:"flush"`
Mapping MappingsSettings `mapstructure:"mapping"`
Discovery DiscoverySettings `mapstructure:"discover"`
Retry RetrySettings `mapstructure:"retry"`
Flush FlushSettings `mapstructure:"flush"`
Mapping MappingsSettings `mapstructure:"mapping"`
LogstashFormat LogstashFormatSettings `mapstructure:"logstash_format"`
}

type LogstashFormatSettings struct {
Enabled bool `mapstructure:"enabled"`
PrefixSeparator string `mapstructure:"prefix_separator"`
DateFormat string `mapstructure:"date_format"`
}

type DynamicIndexSetting struct {
Expand Down
24 changes: 24 additions & 0 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ func TestLoad_DeprecatedIndexConfigOption(t *testing.T) {
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Enabled: false,
PrefixSeparator: "-",
DateFormat: "%Y.%m.%d",
},
})
}

Expand All @@ -76,6 +81,10 @@ func TestLoadConfig(t *testing.T) {
defaultCfg := createDefaultConfig()
defaultCfg.(*Config).Endpoints = []string{"https://elastic.example.com:9200"}

defaultLogstashFormatCfg := createDefaultConfig()
defaultLogstashFormatCfg.(*Config).Endpoints = []string{"http://localhost:9200"}
defaultLogstashFormatCfg.(*Config).LogstashFormat.Enabled = true

tests := []struct {
configFile string
id component.ID
Expand Down Expand Up @@ -129,6 +138,11 @@ func TestLoadConfig(t *testing.T) {
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Enabled: false,
PrefixSeparator: "-",
DateFormat: "%Y.%m.%d",
},
},
},
{
Expand Down Expand Up @@ -174,8 +188,18 @@ func TestLoadConfig(t *testing.T) {
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Enabled: false,
PrefixSeparator: "-",
DateFormat: "%Y.%m.%d",
},
},
},
{
id: component.NewIDWithName(metadata.Type, "logstash_format"),
configFile: "config.yaml",
expected: defaultLogstashFormatCfg,
},
}

for _, tt := range tests {
Expand Down
21 changes: 13 additions & 8 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func createDefaultConfig() component.Config {
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Enabled: false,
PrefixSeparator: "-",
DateFormat: "%Y.%m.%d",
},
}
}

Expand All @@ -71,17 +76,17 @@ func createLogsExporter(
set.Logger.Warn("index option are deprecated and replaced with logs_index and traces_index.")
}

exporter, err := newLogsExporter(set.Logger, cf)
logsExporter, err := newLogsExporter(set.Logger, cf)
if err != nil {
return nil, fmt.Errorf("cannot configure Elasticsearch logs exporter: %w", err)
return nil, fmt.Errorf("cannot configure Elasticsearch logsExporter: %w", err)
}

return exporterhelper.NewLogsExporter(
ctx,
set,
cfg,
exporter.pushLogsData,
exporterhelper.WithShutdown(exporter.Shutdown),
logsExporter.pushLogsData,
exporterhelper.WithShutdown(logsExporter.Shutdown),
exporterhelper.WithQueue(cf.QueueSettings),
)
}
Expand All @@ -91,15 +96,15 @@ func createTracesExporter(ctx context.Context,
cfg component.Config) (exporter.Traces, error) {

cf := cfg.(*Config)
exporter, err := newTracesExporter(set.Logger, cf)
tracesExporter, err := newTracesExporter(set.Logger, cf)
if err != nil {
return nil, fmt.Errorf("cannot configure Elasticsearch traces exporter: %w", err)
return nil, fmt.Errorf("cannot configure Elasticsearch tracesExporter: %w", err)
}
return exporterhelper.NewTracesExporter(
ctx,
set,
cfg,
exporter.pushTraceData,
exporterhelper.WithShutdown(exporter.Shutdown),
tracesExporter.pushTraceData,
exporterhelper.WithShutdown(tracesExporter.Shutdown),
exporterhelper.WithQueue(cf.QueueSettings))
}
2 changes: 2 additions & 0 deletions exporter/elasticsearchexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/elastic/go-structform v0.0.10
github.com/lestrrat-go/strftime v1.0.6
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.90.1
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.90.1
github.com/stretchr/testify v1.8.4
Expand Down Expand Up @@ -34,6 +35,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector v0.90.2-0.20231201205146-6e2fdc755b34 // indirect
Expand Down
6 changes: 6 additions & 0 deletions exporter/elasticsearchexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 18 additions & 7 deletions exporter/elasticsearchexporter/logs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"errors"
"fmt"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -18,9 +19,10 @@ import (
type elasticsearchLogsExporter struct {
logger *zap.Logger

index string
dynamicIndex bool
maxAttempts int
index string
logstashFormat LogstashFormatSettings
dynamicIndex bool
maxAttempts int

client *esClientCurrent
bulkIndexer esBulkIndexerCurrent
Expand Down Expand Up @@ -62,10 +64,11 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*elasticsearchLogsExporte
client: client,
bulkIndexer: bulkIndexer,

index: indexStr,
dynamicIndex: cfg.LogsDynamicIndex.Enabled,
maxAttempts: maxAttempts,
model: model,
index: indexStr,
dynamicIndex: cfg.LogsDynamicIndex.Enabled,
maxAttempts: maxAttempts,
model: model,
logstashFormat: cfg.LogstashFormat,
}
return esLogsExp, nil
}
Expand Down Expand Up @@ -109,6 +112,14 @@ func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource
fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
}

if e.logstashFormat.Enabled {
formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now())
if err != nil {
return err
}
fIndex = formattedIndex
}

document, err := e.model.encodeLog(resource, record, scope)
if err != nil {
return fmt.Errorf("Failed to encode log event: %w", err)
Expand Down
75 changes: 75 additions & 0 deletions exporter/elasticsearchexporter/logs_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"net/http"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -210,6 +211,80 @@ func TestExporter_PushEvent(t *testing.T) {
rec.WaitItems(1)
})

t.Run("publish with logstash index format enabled and dynamic index disabled", func(t *testing.T) {
var defaultCfg Config
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)

data, err := docs[0].Action.MarshalJSON()
assert.Nil(t, err)

jsonVal := map[string]any{}
err = json.Unmarshal(data, &jsonVal)
assert.Nil(t, err)

create := jsonVal["create"].(map[string]any)

assert.Equal(t, strings.Contains(create["_index"].(string), defaultCfg.LogsIndex), true)

return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.LogstashFormat.Enabled = true
cfg.LogsIndex = "not-used-index"
defaultCfg = *cfg
})

mustSendLogsWithAttributes(t, exporter, nil, nil)

rec.WaitItems(1)
})

t.Run("publish with logstash index format enabled and dynamic index enabled", func(t *testing.T) {
var (
prefix = "resprefix-"
suffix = "-attrsuffix"
index = "someindex"
)
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)

data, err := docs[0].Action.MarshalJSON()
assert.Nil(t, err)

jsonVal := map[string]any{}
err = json.Unmarshal(data, &jsonVal)
assert.Nil(t, err)

create := jsonVal["create"].(map[string]any)
expected := fmt.Sprintf("%s%s%s", prefix, index, suffix)

assert.Equal(t, strings.Contains(create["_index"].(string), expected), true)

return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.LogsIndex = index
cfg.LogsDynamicIndex.Enabled = true
cfg.LogstashFormat.Enabled = true
})

mustSendLogsWithAttributes(t, exporter,
map[string]string{
indexPrefix: "attrprefix-",
indexSuffix: suffix,
},
map[string]string{
indexPrefix: prefix,
},
)
rec.WaitItems(1)
})

t.Run("retry http request", func(t *testing.T) {
failures := 0
rec := newBulkRecorder()
Expand Down
4 changes: 4 additions & 0 deletions exporter/elasticsearchexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ elasticsearch/log:
max_requests: 5
sending_queue:
enabled: true
elasticsearch/logstash_format:
endpoints: [http://localhost:9200]
logstash_format:
enabled: true
Loading

0 comments on commit a9d4196

Please sign in to comment.