Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unroll processor #2021

Merged
merged 16 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions factories/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/observiq/bindplane-agent/processor/samplingprocessor"
"github.com/observiq/bindplane-agent/processor/spancountprocessor"
"github.com/observiq/bindplane-agent/processor/throughputmeasurementprocessor"
"github.com/observiq/bindplane-agent/processor/unrollprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatorateprocessor"
Expand Down Expand Up @@ -87,4 +88,5 @@ var defaultProcessors = []processor.Factory{
throughputmeasurementprocessor.NewFactory(),
tailsamplingprocessor.NewFactory(),
transformprocessor.NewFactory(),
unrollprocessor.NewFactory(),
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/observiq/bindplane-agent/processor/samplingprocessor v1.66.0
github.com/observiq/bindplane-agent/processor/spancountprocessor v1.66.0
github.com/observiq/bindplane-agent/processor/throughputmeasurementprocessor v1.66.0
github.com/observiq/bindplane-agent/processor/unrollprocessor v1.66.0
github.com/observiq/bindplane-agent/receiver/awss3rehydrationreceiver v1.66.0
github.com/observiq/bindplane-agent/receiver/azureblobrehydrationreceiver v1.66.0
github.com/observiq/bindplane-agent/receiver/httpreceiver v1.66.0
Expand Down Expand Up @@ -853,6 +854,8 @@ replace github.com/observiq/bindplane-agent/processor/datapointcountprocessor =>

replace github.com/observiq/bindplane-agent/processor/lookupprocessor => ./processor/lookupprocessor

replace github.com/observiq/bindplane-agent/processor/unrollprocessor => ./processor/unrollprocessor

replace github.com/observiq/bindplane-agent/expr => ./expr

replace github.com/observiq/bindplane-agent/counter => ./counter
Expand Down
124 changes: 124 additions & 0 deletions processor/unrollprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Unroll Processor

This is an experimental processor that will take a log record with slice bodies and expand each element of the slice into its own log record within the slice.

## Important Note

This is an experimental processor and is expected that this functionality would eventually be moved to an [OTTL function](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl).

## Supported pipelines

- Logs


## How it works

1. The user configures the `unroll` processor in their desired logs pipeline
2. Logs that go into this pipeline with a pcommon.Slice body will have each element of that body be expanded into its own log record


## Configuration
| Field | Type | Default | Description |
| --------- | ------ | ------- | ---------------------------------------------------------------------------------------------------------- |
| field | string | body | note: body is currently the only available value for unrolling; making this configuration currently static |
| recursive | bool | false | whether to recursively unroll body slices of slices |


### Example configuration

```yaml
unroll:
recursive: false
```



## How To

### Split a log record into multiple via a delimiter: ","

The following configuration utilizes the [transformprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor) to first split the original string body and then the unroll processor can create multiple events

```yaml
receivers:
filelog:
include: [ ./test.txt ]
start_at: beginning
processors:
transform:
log_statements:
- context: log
statements:
- set(body, Split(body, ","))
unroll:
exporters:
file:
path: ./test/output.json
service:
pipelines:
logs:
receivers: [filelog]
processors: [transform, unroll]
exporters: [file]
```

<details>
<summary> Sample Data </summary>

```txt
1,2,3
```

```json
{
"resourceLogs": [
{
"resource": {},
"scopeLogs": [
{
"scope": {},
"logRecords": [
{
"observedTimeUnixNano": "1733240156591852000",
"body": { "stringValue": "1" },
"attributes": [
{
"key": "log.file.name",
"value": { "stringValue": "test.txt" }
},
],
"traceId": "",
"spanId": ""
},
{
"observedTimeUnixNano": "1733240156591852000",
"body": { "stringValue": "2" },
"attributes": [
{
"key": "log.file.name",
"value": { "stringValue": "test.txt" }
},
],
"traceId": "",
"spanId": ""
},
{
"observedTimeUnixNano": "1733240156591852000",
"body": { "stringValue": "3" },
"attributes": [
{
"key": "log.file.name",
"value": { "stringValue": "test.txt" }
},
],
"traceId": "",
"spanId": ""
}
]
}
]
}
]
}
```
</details>
43 changes: 43 additions & 0 deletions processor/unrollprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package unrollprocessor contains the logic to unroll logs from a slice in the body field.
package unrollprocessor

import (
"errors"
)

// Config is the configuration for the unroll processor.
type Config struct {
Field UnrollField `mapstructure:"field"`
Recursive bool `mapstructure:"recursive"`
}

// UnrollField is the field to unroll.
type UnrollField string

const (
// UnrollFieldBody is the only supported field for unrolling logs.
UnrollFieldBody UnrollField = "body"
)

// Validate checks the configuration for any issues.
func (c *Config) Validate() error {
if c.Field != UnrollFieldBody {
return errors.New("only unrolling logs from a body slice is currently supported")
}

return nil
}
53 changes: 53 additions & 0 deletions processor/unrollprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package unrollprocessor

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestValidate(t *testing.T) {

testCases := []struct {
desc string
cfg *Config
expectedErr string
}{
{
desc: "valid config",
cfg: createDefaultConfig().(*Config),
},
{
desc: "config without body field",
cfg: &Config{
Field: "attributes",
},
expectedErr: "only unrolling logs from a body slice is currently supported",
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
err := tc.cfg.Validate()
if tc.expectedErr != "" {
require.ErrorContains(t, err, tc.expectedErr)
} else {
require.NoError(t, err)
}
})
}
}
70 changes: 70 additions & 0 deletions processor/unrollprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package unrollprocessor // import "github.com/observiq/bindplane-agent/processor/unrollprocessor"

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
)

var processorCapabilities = consumer.Capabilities{MutatesData: true}

// componentType is the value of the "type" key in configuration.
var componentType = component.MustNewType("unroll")

const (
stability = component.StabilityLevelAlpha
)

// NewFactory returns a new factory for the Transform processor.
func NewFactory() processor.Factory {
return processor.NewFactory(
componentType,
createDefaultConfig,
processor.WithLogs(createLogsProcessor, stability),
)
}

func createDefaultConfig() component.Config {
return &Config{
Field: UnrollFieldBody,
}
}

func createLogsProcessor(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Logs,
) (processor.Logs, error) {
oCfg := cfg.(*Config)

proc, err := newUnrollProcessor(oCfg)
if err != nil {
return nil, fmt.Errorf("invalid config for \"unroll\" processor %w", err)
}
return processorhelper.NewLogs(
ctx,
set,
cfg,
nextConsumer,
proc.ProcessLogs,
processorhelper.WithCapabilities(processorCapabilities))
}
47 changes: 47 additions & 0 deletions processor/unrollprocessor/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package unrollprocessor

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor/processortest"
)

func TestNewFactory(t *testing.T) {
factory := NewFactory()
require.Equal(t, componentType, factory.Type())

expectedCfg := &Config{
Field: UnrollFieldBody,
}

cfg, ok := factory.CreateDefaultConfig().(*Config)
require.True(t, ok)
require.Equal(t, expectedCfg, cfg)
}

func TestBadFactory(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Field = "invalid"

_, err := factory.CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, &consumertest.LogsSink{})
require.Error(t, err)
require.ErrorContains(t, err, "invalid config for \"unroll\" processor")
}
Loading