From 73ee3ba913b6a0f1530c7f6c0164b0ea056f3d37 Mon Sep 17 00:00:00 2001 From: PepperKick Date: Sat, 9 Mar 2024 06:10:02 +0530 Subject: [PATCH] [exporter/awss3] Add compression option (27872) (#31622) **Description:** Add `compression` option to compress files using `compress/gzip` library before uploading to S3. **Link to tracking Issue:** Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27872 **Testing:** Sent n number of traces through the S3 exporter using k6 to compare sizes. Used Minio as the S3 backend. | Marshaler | Compression | k6 Requests | k6 Data Sent | S3 Objects | S3 Total Size | | --- | --- | --- | --- | --- | --- | | otlp_json | No | 101 | 118 KB | 101 | 36 KB | | otlp_proto | No | 101 | 118 KB | 101 | 11 KB | | otlp_json | Yes | 101 | 118 KB | 101 | 21 KB | | otlp_proto | Yes | 101 | 118 KB | 101 | 9.9 KB | Additionally, new unit test to check file name. **Documentation:** - Updated README.md file --- .chloggen/add-compression-option.yaml | 27 ++++++++++++ exporter/awss3exporter/README.md | 29 +++++++------ exporter/awss3exporter/config.go | 30 +++++++++---- exporter/awss3exporter/config_test.go | 42 +++++++++++++++++++ exporter/awss3exporter/go.mod | 1 + exporter/awss3exporter/go.sum | 2 + exporter/awss3exporter/s3_writer.go | 42 +++++++++++++++---- exporter/awss3exporter/s3_writer_test.go | 16 ++++++- .../awss3exporter/testdata/compression.yaml | 26 ++++++++++++ 9 files changed, 185 insertions(+), 30 deletions(-) create mode 100644 .chloggen/add-compression-option.yaml create mode 100644 exporter/awss3exporter/testdata/compression.yaml diff --git a/.chloggen/add-compression-option.yaml b/.chloggen/add-compression-option.yaml new file mode 100644 index 000000000000..1f3b91b5daf4 --- /dev/null +++ b/.chloggen/add-compression-option.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awss3exporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "add `compression` option to enable file compression on S3" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [ 27872 ] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Add `compression` option to compress files using `compress/gzip` library before uploading to S3. +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [ user ] \ No newline at end of file diff --git a/exporter/awss3exporter/README.md b/exporter/awss3exporter/README.md index 2f29ba5eb4db..99198242954a 100644 --- a/exporter/awss3exporter/README.md +++ b/exporter/awss3exporter/README.md @@ -22,18 +22,19 @@ This exporter targets to support proto/json format. The following exporter configuration parameters are supported. -| Name | Description | Default | -|:----------------------|:---------------------------------------------------------------------------------------------------------------------------------------------|-------------| -| `region` | AWS region. | "us-east-1" | -| `s3_bucket` | S3 bucket | | -| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | | -| `s3_partition` | time granularity of S3 key: hour or minute | "minute" | -| `role_arn` | the Role ARN to be assumed | | -| `file_prefix` | file prefix defined by user | | -| `marshaler` | marshaler used to produce output data | `otlp_json` | -| `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | | -| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false | -| `disable_ssl` | set this to `true` to disable SSL when sending requests | false | +| Name | Description | Default | +|:----------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------| +| `region` | AWS region. | "us-east-1" | +| `s3_bucket` | S3 bucket | | +| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | | +| `s3_partition` | time granularity of S3 key: hour or minute | "minute" | +| `role_arn` | the Role ARN to be assumed | | +| `file_prefix` | file prefix defined by user | | +| `marshaler` | marshaler used to produce output data | `otlp_json` | +| `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | | +| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false | +| `disable_ssl` | set this to `true` to disable SSL when sending requests | false | +| `compression` | should the file be compressed | none | ### Marshaler @@ -46,6 +47,10 @@ Marshaler determines the format of data sent to AWS S3. Currently, the following - `body`: export the log body as string. **This format is supported only for logs.** +### Compression +- `none` (default): No compression will be applied +- `gzip`: Files will be compressed with gzip. **This does not support `sumo_ic`marshaler.** + # Example Configuration Following example configuration defines to store output in 'eu-central' region and bucket named 'databucket'. diff --git a/exporter/awss3exporter/config.go b/exporter/awss3exporter/config.go index 8ecd4bf6b2d3..cb7b101fdf5f 100644 --- a/exporter/awss3exporter/config.go +++ b/exporter/awss3exporter/config.go @@ -6,21 +6,23 @@ package awss3exporter // import "github.com/open-telemetry/opentelemetry-collect import ( "errors" + "go.opentelemetry.io/collector/config/configcompression" "go.uber.org/multierr" ) // S3UploaderConfig contains aws s3 uploader related config to controls things // like bucket, prefix, batching, connections, retries, etc. type S3UploaderConfig struct { - Region string `mapstructure:"region"` - S3Bucket string `mapstructure:"s3_bucket"` - S3Prefix string `mapstructure:"s3_prefix"` - S3Partition string `mapstructure:"s3_partition"` - FilePrefix string `mapstructure:"file_prefix"` - Endpoint string `mapstructure:"endpoint"` - RoleArn string `mapstructure:"role_arn"` - S3ForcePathStyle bool `mapstructure:"s3_force_path_style"` - DisableSSL bool `mapstructure:"disable_ssl"` + Region string `mapstructure:"region"` + S3Bucket string `mapstructure:"s3_bucket"` + S3Prefix string `mapstructure:"s3_prefix"` + S3Partition string `mapstructure:"s3_partition"` + FilePrefix string `mapstructure:"file_prefix"` + Endpoint string `mapstructure:"endpoint"` + RoleArn string `mapstructure:"role_arn"` + S3ForcePathStyle bool `mapstructure:"s3_force_path_style"` + DisableSSL bool `mapstructure:"disable_ssl"` + Compression configcompression.Type `mapstructure:"compression"` } type MarshalerType string @@ -48,5 +50,15 @@ func (c *Config) Validate() error { if c.S3Uploader.S3Bucket == "" { errs = multierr.Append(errs, errors.New("bucket is required")) } + compression := c.S3Uploader.Compression + if compression.IsCompressed() { + if compression != configcompression.TypeGzip { + errs = multierr.Append(errs, errors.New("unknown compression type")) + } + + if c.MarshalerName == SumoIC { + errs = multierr.Append(errs, errors.New("marshaler does not support compression")) + } + } return errs } diff --git a/exporter/awss3exporter/config_test.go b/exporter/awss3exporter/config_test.go index 4ff16b56b9b7..00dae7ebf4e8 100644 --- a/exporter/awss3exporter/config_test.go +++ b/exporter/awss3exporter/config_test.go @@ -196,3 +196,45 @@ func TestMarshallerName(t *testing.T) { ) } + +func TestCompressionName(t *testing.T) { + factories, err := otelcoltest.NopFactories() + assert.NoError(t, err) + + factory := NewFactory() + factories.Exporters[factory.Type()] = factory + cfg, err := otelcoltest.LoadConfigAndValidate( + filepath.Join("testdata", "compression.yaml"), factories) + + require.NoError(t, err) + require.NotNil(t, cfg) + + e := cfg.Exporters[component.MustNewID("awss3")].(*Config) + + assert.Equal(t, e, + &Config{ + S3Uploader: S3UploaderConfig{ + Region: "us-east-1", + S3Bucket: "foo", + S3Partition: "minute", + Compression: "gzip", + }, + MarshalerName: "otlp_json", + }, + ) + + e = cfg.Exporters[component.MustNewIDWithName("awss3", "proto")].(*Config) + + assert.Equal(t, e, + &Config{ + S3Uploader: S3UploaderConfig{ + Region: "us-east-1", + S3Bucket: "bar", + S3Partition: "minute", + Compression: "none", + }, + MarshalerName: "otlp_proto", + }, + ) + +} diff --git a/exporter/awss3exporter/go.mod b/exporter/awss3exporter/go.mod index 5b1b0ac6bb64..d05c807e4d8f 100644 --- a/exporter/awss3exporter/go.mod +++ b/exporter/awss3exporter/go.mod @@ -6,6 +6,7 @@ require ( github.com/aws/aws-sdk-go v1.50.27 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.96.1-0.20240306115632-b2693620eff6 + go.opentelemetry.io/collector/config/configcompression v0.96.1-0.20240306115632-b2693620eff6 go.opentelemetry.io/collector/confmap v0.96.1-0.20240306115632-b2693620eff6 go.opentelemetry.io/collector/consumer v0.96.1-0.20240306115632-b2693620eff6 go.opentelemetry.io/collector/exporter v0.96.1-0.20240306115632-b2693620eff6 diff --git a/exporter/awss3exporter/go.sum b/exporter/awss3exporter/go.sum index 1ff37207535f..493c8cdb18fb 100644 --- a/exporter/awss3exporter/go.sum +++ b/exporter/awss3exporter/go.sum @@ -147,6 +147,8 @@ go.opentelemetry.io/collector v0.96.1-0.20240306115632-b2693620eff6 h1:yaLqn47nY go.opentelemetry.io/collector v0.96.1-0.20240306115632-b2693620eff6/go.mod h1:lNnB7h6qHClzeyZKauJdXB6wLhTq5sFvt1G70zTYmTs= go.opentelemetry.io/collector/component v0.96.1-0.20240306115632-b2693620eff6 h1:hfSeafFTFnO+X0v9oDSnBQA8wKOYz99VfUSP/SE5cwk= go.opentelemetry.io/collector/component v0.96.1-0.20240306115632-b2693620eff6/go.mod h1:0evn//YPgN/5VmbbD4JS0yH3ikWxwROQN1MKEOM/U3M= +go.opentelemetry.io/collector/config/configcompression v0.96.1-0.20240306115632-b2693620eff6 h1:AA/eiK/rFUYRhkpqGwYHJLO3LwBy9tw4rOgWxy/KQAU= +go.opentelemetry.io/collector/config/configcompression v0.96.1-0.20240306115632-b2693620eff6/go.mod h1:O0fOPCADyGwGLLIf5lf7N3960NsnIfxsm6dr/mIpL+M= go.opentelemetry.io/collector/config/confignet v0.96.0 h1:ZUwziVVxWgcRMqukfKfdEjxfgmfhGsX6J3GEzF/Pupk= go.opentelemetry.io/collector/config/confignet v0.96.0/go.mod h1:BVw5xkQ7TH2wH75cbph+dtOoxq1baWLuhdSYIAvuVu0= go.opentelemetry.io/collector/config/configretry v0.96.1-0.20240306115632-b2693620eff6 h1:BzuuN5Oo7knT4areFJxslVWfSpXAgtovd2KzxcVIjUQ= diff --git a/exporter/awss3exporter/s3_writer.go b/exporter/awss3exporter/s3_writer.go index c0cafeb13466..1518bb48ecf0 100644 --- a/exporter/awss3exporter/s3_writer.go +++ b/exporter/awss3exporter/s3_writer.go @@ -5,6 +5,7 @@ package awss3exporter // import "github.com/open-telemetry/opentelemetry-collect import ( "bytes" + "compress/gzip" "context" "fmt" "math/rand" @@ -15,6 +16,7 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials/stscreds" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "go.opentelemetry.io/collector/config/configcompression" ) type s3Writer struct { @@ -38,12 +40,17 @@ func randomInRange(low, hi int) int { return low + rand.Intn(hi-low) } -func getS3Key(time time.Time, keyPrefix string, partition string, filePrefix string, metadata string, fileformat string) string { +func getS3Key(time time.Time, keyPrefix string, partition string, filePrefix string, metadata string, fileformat string, compression configcompression.Type) string { timeKey := getTimeKey(time, partition) randomID := randomInRange(100000000, 999999999) s3Key := keyPrefix + "/" + timeKey + "/" + filePrefix + metadata + "_" + strconv.Itoa(randomID) + "." + fileformat + // add ".gz" extension to files if compression is enabled + if compression == configcompression.TypeGzip { + s3Key += ".gz" + } + return s3Key } @@ -77,10 +84,28 @@ func (s3writer *s3Writer) writeBuffer(_ context.Context, buf []byte, config *Con now := time.Now() key := getS3Key(now, config.S3Uploader.S3Prefix, config.S3Uploader.S3Partition, - config.S3Uploader.FilePrefix, metadata, format) - - // create a reader from data data in memory - reader := bytes.NewReader(buf) + config.S3Uploader.FilePrefix, metadata, format, config.S3Uploader.Compression) + + encoding := "" + var reader *bytes.Reader + if config.S3Uploader.Compression == configcompression.TypeGzip { + // set s3 uploader content encoding to "gzip" + encoding = "gzip" + var gzipContents bytes.Buffer + + // create a gzip from data + gzipWriter := gzip.NewWriter(&gzipContents) + _, err := gzipWriter.Write(buf) + if err != nil { + return err + } + gzipWriter.Close() + + reader = bytes.NewReader(gzipContents.Bytes()) + } else { + // create a reader from data in memory + reader = bytes.NewReader(buf) + } sessionConfig := getSessionConfig(config) sess, err := getSession(config, sessionConfig) @@ -92,9 +117,10 @@ func (s3writer *s3Writer) writeBuffer(_ context.Context, buf []byte, config *Con uploader := s3manager.NewUploader(sess) _, err = uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(config.S3Uploader.S3Bucket), - Key: aws.String(key), - Body: reader, + Bucket: aws.String(config.S3Uploader.S3Bucket), + Key: aws.String(key), + Body: reader, + ContentEncoding: &encoding, }) if err != nil { return err diff --git a/exporter/awss3exporter/s3_writer_test.go b/exporter/awss3exporter/s3_writer_test.go index 8b5e4eedcb5d..1da692ba92fa 100644 --- a/exporter/awss3exporter/s3_writer_test.go +++ b/exporter/awss3exporter/s3_writer_test.go @@ -36,7 +36,21 @@ func TestS3Key(t *testing.T) { require.NotNil(t, tm) re := regexp.MustCompile(`keyprefix/year=2022/month=06/day=05/hour=00/minute=00/fileprefixlogs_([0-9]+).json`) - s3Key := getS3Key(tm, "keyprefix", "minute", "fileprefix", "logs", "json") + s3Key := getS3Key(tm, "keyprefix", "minute", "fileprefix", "logs", "json", "") + matched := re.MatchString(s3Key) + assert.Equal(t, true, matched) +} + +func TestS3KeyOfCompressedFile(t *testing.T) { + const layout = "2006-01-02" + + tm, err := time.Parse(layout, "2022-06-05") + + assert.NoError(t, err) + require.NotNil(t, tm) + + re := regexp.MustCompile(`keyprefix/year=2022/month=06/day=05/hour=00/minute=00/fileprefixlogs_([0-9]+).json.gz`) + s3Key := getS3Key(tm, "keyprefix", "minute", "fileprefix", "logs", "json", "gzip") matched := re.MatchString(s3Key) assert.Equal(t, true, matched) } diff --git a/exporter/awss3exporter/testdata/compression.yaml b/exporter/awss3exporter/testdata/compression.yaml new file mode 100644 index 000000000000..68966317b961 --- /dev/null +++ b/exporter/awss3exporter/testdata/compression.yaml @@ -0,0 +1,26 @@ +receivers: + nop: + +exporters: + awss3: + s3uploader: + s3_bucket: "foo" + compression: "gzip" + marshaler: otlp_json + + awss3/proto: + s3uploader: + s3_bucket: "bar" + compression: "none" + marshaler: otlp_proto + + +processors: + nop: + +service: + pipelines: + traces: + receivers: [nop] + processors: [nop] + exporters: [awss3, awss3/proto]