Skip to content

Commit

Permalink
[exporter/file] Add group_by configuration (#31396)
Browse files Browse the repository at this point in the history
**Description:** 

Added the option to write telemetry data into multiple files, where the
file path is based on a resource attribute.

**Link to tracking Issue:** 

#24654

**Testing:** 

Added tests and benchmark for new functionality.

**Documentation:**

Updated README.md
  • Loading branch information
adam-kiss-sg authored Mar 12, 2024
1 parent 3979dcc commit b9ce2d3
Show file tree
Hide file tree
Showing 13 changed files with 1,003 additions and 23 deletions.
27 changes: 27 additions & 0 deletions .chloggen/file-exporter-group-by-attr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: fileexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added the option to write telemetry data into multiple files, where the file path is based on a resource attribute.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24654]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
16 changes: 15 additions & 1 deletion exporter/fileexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ Exporter supports the following features:

+ Support for compressing the telemetry data before exporting.

+ Support for writing into multiple files, where the file path is determined by a resource attribute.

Please note that there is no guarantee that exact field names will remain stable.
This intended for primarily for debugging Collector without setting up backends.

The official [opentelemetry-collector-contrib container](https://hub.docker.com/r/otel/opentelemetry-collector-contrib/tags#!) does not have a writable filesystem by default since it's built on the `scratch` layer.
As such, you will need to create a writable directory for the path, potentially by mounting writable volumes or creating a custom image.
Expand All @@ -52,6 +52,11 @@ The following settings are optional:
- `flush_interval`[default: 1s]: `time.Duration` interval between flushes. See [time.ParseDuration](https://pkg.go.dev/time#ParseDuration) for valid formats.
NOTE: a value without unit is in nanoseconds and `flush_interval` is ignored and writes are not buffered if `rotation` is set.

- `group_by` enables writing to separate files based on a resource attribute.
- enabled: [default: false] enables group_by. When group_by is enabled, rotation setting is ignored.
- resource_attribute: [default: fileexporter.path_segment]: specifies the name of the resource attribute that contains the path segment of the file to write to. The final path will be the `path` config value, with the `*` replaced with the value of this resource attribute.
- max_open_files: [default: 100]: specifies the maximum number of open file descriptors for the output files.

## File Rotation
Telemetry data is exported to a single file by default.
`fileexporter` only enables file rotation when the user specifies `rotation:` in the config. However, if specified, related default settings would apply.
Expand Down Expand Up @@ -79,6 +84,15 @@ When `format` is json and `compression` is none , telemetry data is written to f

Otherwise, when using `proto` format or any kind of encoding, each encoded object is preceded by 4 bytes (an unsigned 32 bit integer) which represent the number of bytes contained in the encoded object.When we need read the messages back in, we read the size, then read the bytes into a separate buffer, then parse from that buffer.

## Group by attribute

By specifying `group_by.resource_attribute` in the config, the exporter will determine a filepath for each telemetry record, by substituting the value of the resource attribute into the `path` configuration value.

The final path is guaranteed to start with the prefix part of the `path` config value (the part before the `*` character). For example if `path` is "/data/*.json", and the resource attribute value is "../etc/my_config", then the final path will be sanitized to "/data/etc/my_config.json".

The final path can contain path separators (`/`). The exporter will create missing directories recursively (similarly to `mkdir -p`).

Grouping by attribute currently only supports a **single** **resource** attribute. If you would like to use multiple attributes, please use [Transform processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor) create a routing key. If you would like to use a non-resource level (eg: Log/Metric/DataPoint) attribute, please use [Group by Attributes processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/groupbyattrsprocessor) first.

## Example:

Expand Down
40 changes: 38 additions & 2 deletions exporter/fileexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package fileexporter // import "github.com/open-telemetry/opentelemetry-collecto
import (
"errors"
"fmt"
"strings"
"time"

"go.opentelemetry.io/collector/component"
Expand All @@ -23,13 +24,14 @@ type Config struct {
// Path of the file to write to. Path is relative to current directory.
Path string `mapstructure:"path"`

// Mode defines whether the exporter should append to the file
// Mode defines whether the exporter should append to the file.
// Options:
// - false[default]: truncates the file
// - true: appends to the file.
Append bool `mapstructure:"append"`

// Rotation defines an option about rotation of telemetry files
// Rotation defines an option about rotation of telemetry files. Ignored
// when GroupByAttribute is used.
Rotation *Rotation `mapstructure:"rotation"`

// FormatType define the data format of encoded telemetry data
Expand All @@ -45,6 +47,9 @@ type Config struct {
// FlushInterval is the duration between flushes.
// See time.ParseDuration for valid values.
FlushInterval time.Duration `mapstructure:"flush_interval"`

// GroupBy enables writing to separate files based on a resource attribute.
GroupBy *GroupBy `mapstructure:"group_by"`
}

// Rotation an option to rolling log files
Expand All @@ -70,6 +75,21 @@ type Rotation struct {
LocalTime bool `mapstructure:"localtime"`
}

type GroupBy struct {
// Enables group_by. When group_by is enabled, rotation setting is ignored. Default is false.
Enabled bool `mapstructure:"enabled"`

// ResourceAttribute specifies the name of the resource attribute that
// contains the path segment of the file to write to. The final path will be
// the Path config value, with the * replaced with the value of this resource
// attribute. Default is "fileexporter.path_segment".
ResourceAttribute string `mapstructure:"resource_attribute"`

// MaxOpenFiles specifies the maximum number of open file descriptors for the output files.
// The default is 100.
MaxOpenFiles int `mapstructure:"max_open_files"`
}

var _ component.Config = (*Config)(nil)

// Validate checks if the exporter configuration is valid
Expand All @@ -92,6 +112,22 @@ func (cfg *Config) Validate() error {
if cfg.FlushInterval < 0 {
return errors.New("flush_interval must be larger than zero")
}

if cfg.GroupBy != nil && cfg.GroupBy.Enabled {
pathParts := strings.Split(cfg.Path, "*")
if len(pathParts) != 2 {
return errors.New("path must contain exatcly one * when group_by is enabled")
}

if len(pathParts[0]) == 0 {
return errors.New("path must not start with * when group_by is enabled")
}

if cfg.GroupBy.ResourceAttribute == "" {
return errors.New("resource_attribute must not be empty when group_by is enabled")
}
}

return nil
}

Expand Down
66 changes: 66 additions & 0 deletions exporter/fileexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func TestLoadConfig(t *testing.T) {
},
FormatType: formatTypeJSON,
FlushInterval: time.Second,
GroupBy: &GroupBy{
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
Expand All @@ -54,6 +58,10 @@ func TestLoadConfig(t *testing.T) {
FormatType: formatTypeProto,
Compression: compressionZSTD,
FlushInterval: time.Second,
GroupBy: &GroupBy{
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
Expand All @@ -65,6 +73,10 @@ func TestLoadConfig(t *testing.T) {
MaxBackups: defaultMaxBackups,
},
FlushInterval: time.Second,
GroupBy: &GroupBy{
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
Expand All @@ -77,6 +89,10 @@ func TestLoadConfig(t *testing.T) {
},
FormatType: formatTypeJSON,
FlushInterval: time.Second,
GroupBy: &GroupBy{
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
Expand All @@ -93,6 +109,10 @@ func TestLoadConfig(t *testing.T) {
Path: "./flushed",
FlushInterval: 5,
FormatType: formatTypeJSON,
GroupBy: &GroupBy{
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
Expand All @@ -101,6 +121,10 @@ func TestLoadConfig(t *testing.T) {
Path: "./flushed",
FlushInterval: 5 * time.Second,
FormatType: formatTypeJSON,
GroupBy: &GroupBy{
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
Expand All @@ -109,6 +133,10 @@ func TestLoadConfig(t *testing.T) {
Path: "./flushed",
FlushInterval: 500 * time.Millisecond,
FormatType: formatTypeJSON,
GroupBy: &GroupBy{
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
Expand All @@ -119,6 +147,44 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, ""),
errorMessage: "path must be non-empty",
},
{
id: component.NewIDWithName(metadata.Type, "group_by"),
expected: &Config{
Path: "./group_by/*.json",
FlushInterval: time.Second,
FormatType: formatTypeJSON,
GroupBy: &GroupBy{
Enabled: true,
MaxOpenFiles: 10,
ResourceAttribute: "dummy",
},
},
},
{
id: component.NewIDWithName(metadata.Type, "group_by_defaults"),
expected: &Config{
Path: "./group_by/*.json",
FlushInterval: time.Second,
FormatType: formatTypeJSON,
GroupBy: &GroupBy{
Enabled: true,
MaxOpenFiles: defaultMaxOpenFiles,
ResourceAttribute: defaultResourceAttribute,
},
},
},
{
id: component.NewIDWithName(metadata.Type, "group_by_invalid_path"),
errorMessage: "path must contain exatcly one * when group_by is enabled",
},
{
id: component.NewIDWithName(metadata.Type, "group_by_invalid_path2"),
errorMessage: "path must not start with * when group_by is enabled",
},
{
id: component.NewIDWithName(metadata.Type, "group_by_empty_resource_attribute"),
errorMessage: "resource_attribute must not be empty when group_by is enabled",
},
}

for _, tt := range tests {
Expand Down
33 changes: 25 additions & 8 deletions exporter/fileexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter/internal/metadata"
Expand All @@ -32,6 +33,10 @@ const (

// the type of compression codec
compressionZSTD = "zstd"

defaultMaxOpenFiles = 100

defaultResourceAttribute = "fileexporter.path_segment"
)

type FileExporter interface {
Expand All @@ -55,6 +60,10 @@ func createDefaultConfig() component.Config {
return &Config{
FormatType: formatTypeJSON,
Rotation: &Rotation{MaxBackups: defaultMaxBackups},
GroupBy: &GroupBy{
ResourceAttribute: defaultResourceAttribute,
MaxOpenFiles: defaultMaxOpenFiles,
},
}
}

Expand All @@ -63,7 +72,7 @@ func createTracesExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Traces, error) {
fe := getOrCreateFileExporter(cfg)
fe := getOrCreateFileExporter(cfg, set.Logger)
return exporterhelper.NewTracesExporter(
ctx,
set,
Expand All @@ -80,7 +89,7 @@ func createMetricsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Metrics, error) {
fe := getOrCreateFileExporter(cfg)
fe := getOrCreateFileExporter(cfg, set.Logger)
return exporterhelper.NewMetricsExporter(
ctx,
set,
Expand All @@ -97,7 +106,7 @@ func createLogsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Logs, error) {
fe := getOrCreateFileExporter(cfg)
fe := getOrCreateFileExporter(cfg, set.Logger)
return exporterhelper.NewLogsExporter(
ctx,
set,
Expand All @@ -113,20 +122,28 @@ func createLogsExporter(
// or returns the already cached one. Caching is required because the factory is asked trace and
// metric receivers separately when it gets CreateTracesReceiver() and CreateMetricsReceiver()
// but they must not create separate objects, they must use one Exporter object per configuration.
func getOrCreateFileExporter(cfg component.Config) FileExporter {
func getOrCreateFileExporter(cfg component.Config, logger *zap.Logger) FileExporter {
conf := cfg.(*Config)
fe := exporters.GetOrAdd(cfg, func() component.Component {
return newFileExporter(conf)
return newFileExporter(conf, logger)
})

c := fe.Unwrap()
return c.(FileExporter)
}

func newFileExporter(conf *Config) FileExporter {
return &fileExporter{
conf: conf,
func newFileExporter(conf *Config, logger *zap.Logger) FileExporter {
if conf.GroupBy == nil || !conf.GroupBy.Enabled {
return &fileExporter{
conf: conf,
}
}

return &groupingFileExporter{
conf: conf,
logger: logger,
}

}

func newFileWriter(path string, shouldAppend bool, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) {
Expand Down
9 changes: 1 addition & 8 deletions exporter/fileexporter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,7 @@ func (e *fileExporter) consumeLogs(_ context.Context, ld plog.Logs) error {

// Start starts the flush timer if set.
func (e *fileExporter) Start(_ context.Context, _ component.Host) error {
e.marshaller = &marshaller{
formatType: e.conf.FormatType,
tracesMarshaler: tracesMarshalers[e.conf.FormatType],
metricsMarshaler: metricsMarshalers[e.conf.FormatType],
logsMarshaler: logsMarshalers[e.conf.FormatType],
compression: e.conf.Compression,
compressor: buildCompressor(e.conf.Compression),
}
e.marshaller = newMarshaller(e.conf)
export := buildExportFunc(e.conf)

var err error
Expand Down
Loading

0 comments on commit b9ce2d3

Please sign in to comment.