Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add performance test for filelog based kubernetes container logs receiver #2564

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion internal/stanza/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package stanza

import (
// Register parsers for stanza-based log receivers
// Register parsers and transformers for stanza-based log receivers
_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/json"
_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/regex"
_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/severity"
_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/time"
_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/transformer/metadata"
_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/transformer/restructure"
_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/transformer/router"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was unable to make linter happy with comment above transformer section. At least one of two linters has been always raising some issues:

Check License finished successfully
internal/stanza/register.go: Import groups are not in the proper order: ["Third party" "Third party"]

or

register.go:22: File is not `goimports`-ed with -local github.com/open-telemetry/opentelemetry-collector-contrib (goimports)
        _ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/time"

or

register.go:23: File is not `gofmt`-ed with `-s` (gofmt)

I tried three different approaches

	_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/time"
	// comment here
	_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/transformer/metadata"
	_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/time"

	// comment here
	_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/transformer/metadata"
	_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/time"
        
	// comment here (preceding line is tab indented)
	_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/transformer/metadata"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to split into two groups - looks good to me like this.

)
318 changes: 318 additions & 0 deletions testbed/datasenders/k8s.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datasenders

import (
"context"
"fmt"
"io/ioutil"
"os"
"strconv"
"strings"
"time"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/testbed/testbed"
)

// FileLogK8sWriter represents abstract container k8s writer
type FileLogK8sWriter struct {
testbed.DataSenderBase
file *os.File
config string
}

// Ensure FileLogK8sWriter implements LogDataSender.
var _ testbed.LogDataSender = (*FileLogK8sWriter)(nil)

// NewFileLogK8sWriter creates a new data sender that will write kubernetes containerd
// log entries to a file, to be tailed by FileLogReceiver and sent to the collector.
//
// config is an Otelcol config appended to the receivers section after executing fmt.Sprintf on it.
// This implies few things:
// - it should contain `%s` which will be replaced with the filename
// - all `%` should be represented as `%%`
// - indentation style matters. Spaces have to be used for indentation
// and it should start with two spaces indentation
//
// Example config:
// |`
// | filelog:
// | include: [ %s ]
// | start_at: beginning
// | operators:
// | type: regex_parser
// | regex: ^(?P<log>.*)$
// | `
func NewFileLogK8sWriter(config string) *FileLogK8sWriter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment to explain what config must contain.

dir, err := ioutil.TempDir("", "namespace-*_test-pod_000011112222333344445555666677778888")
if err != nil {
panic("failed to create temp dir")
}
dir, err = ioutil.TempDir(dir, "*")
if err != nil {
panic("failed to create temp dir")
}

file, err := ioutil.TempFile(dir, "*.log")
if err != nil {
panic("failed to create temp file")
}

f := &FileLogK8sWriter{
file: file,
config: config,
}

return f
}

func (f *FileLogK8sWriter) Start() error {
return nil
}

func (f *FileLogK8sWriter) ConsumeLogs(_ context.Context, logs pdata.Logs) error {
for i := 0; i < logs.ResourceLogs().Len(); i++ {
for j := 0; j < logs.ResourceLogs().At(i).InstrumentationLibraryLogs().Len(); j++ {
ills := logs.ResourceLogs().At(i).InstrumentationLibraryLogs().At(j)
for k := 0; k < ills.Logs().Len(); k++ {
_, err := f.file.Write(append(f.convertLogToTextLine(ills.Logs().At(k)), '\n'))
if err != nil {
return err
}
}
}
}
return nil
}

func (f *FileLogK8sWriter) convertLogToTextLine(lr pdata.LogRecord) []byte {
sb := strings.Builder{}

// Timestamp
sb.WriteString(time.Unix(0, int64(lr.Timestamp())).Format("2006-01-02T15:04:05.000000000Z"))

// Severity
sb.WriteString(" stderr F ")
sb.WriteString(lr.SeverityText())
sb.WriteString(" ")

if lr.Body().Type() == pdata.AttributeValueSTRING {
sb.WriteString(lr.Body().StringVal())
}

lr.Attributes().ForEach(func(k string, v pdata.AttributeValue) {
sb.WriteString(" ")
sb.WriteString(k)
sb.WriteString("=")
switch v.Type() {
case pdata.AttributeValueSTRING:
sb.WriteString(v.StringVal())
case pdata.AttributeValueINT:
sb.WriteString(strconv.FormatInt(v.IntVal(), 10))
case pdata.AttributeValueDOUBLE:
sb.WriteString(strconv.FormatFloat(v.DoubleVal(), 'f', -1, 64))
case pdata.AttributeValueBOOL:
sb.WriteString(strconv.FormatBool(v.BoolVal()))
default:
panic("missing case")
}
})

return []byte(sb.String())
}

func (f *FileLogK8sWriter) Flush() {
_ = f.file.Sync()
}

func (f *FileLogK8sWriter) GenConfigYAMLStr() string {
// Note that this generates a receiver config for agent.
// We are testing filelog receiver here.

return fmt.Sprintf(f.config, f.file.Name())
}

func (f *FileLogK8sWriter) ProtocolName() string {
return "filelog"
}

func (f *FileLogK8sWriter) GetEndpoint() string {
return ""
}

// NewKubernetesContainerWriter returns FileLogK8sWriter with configuration
// to recognize and parse kubernetes container logs
func NewKubernetesContainerWriter() *FileLogK8sWriter {
return NewFileLogK8sWriter(`
filelog:
include: [ %s ]
start_at: beginning
include_file_path: true
include_file_name: false
operators:
# Find out which format is used by kubernetes
- type: router
id: get-format
routes:
- output: parser-docker
expr: '$$record matches "^\\{"'
- output: parser-crio
expr: '$$record matches "^[^ Z]+ "'
- output: parser-containerd
expr: '$$record matches "^[^ Z]+Z"'
# Parse CRI-O format
- type: regex_parser
id: parser-crio
regex: '^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
output: extract_metadata_from_filepath
timestamp:
parse_from: time
layout_type: gotime
layout: '2006-01-02T15:04:05.000000000-07:00'
# Parse CRI-Containerd format
- type: regex_parser
id: parser-containerd
regex: '^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
output: extract_metadata_from_filepath
timestamp:
parse_from: time
layout: '%%Y-%%m-%%dT%%H:%%M:%%S.%%LZ'
# Parse Docker format
- type: json_parser
id: parser-docker
output: extract_metadata_from_filepath
timestamp:
parse_from: time
layout: '%%Y-%%m-%%dT%%H:%%M:%%S.%%LZ'
# Extract metadata from file path
- type: regex_parser
id: extract_metadata_from_filepath
regex: '^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]{36})\/(?P<container_name>[^\._]+)\/(?P<run_id>\d+)\.log$'
parse_from: $$attributes.file_path
# Move out attributes to Attributes
- type: metadata
attributes:
stream: 'EXPR($.stream)'
k8s.container.name: 'EXPR($.container_name)'
k8s.namespace.name: 'EXPR($.namespace)'
k8s.pod.name: 'EXPR($.pod_name)'
run_id: 'EXPR($.run_id)'
k8s.pod.uid: 'EXPR($.uid)'
# Clean up log record
- type: restructure
id: clean-up-log-record
ops:
- remove: logtag
- remove: stream
- remove: container_name
- remove: namespace
- remove: pod_name
- remove: run_id
- remove: uid
`)
}

// NewKubernetesCRIContainerdWriter returns FileLogK8sWriter with configuration
// to parse only CRI-Containerd kubernetes logs
func NewKubernetesCRIContainerdWriter() *FileLogK8sWriter {
return NewFileLogK8sWriter(`
filelog:
include: [ %s ]
start_at: beginning
include_file_path: true
include_file_name: false
operators:
# Parse CRI-Containerd format
- type: regex_parser
id: parser-containerd
regex: '^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
output: extract_metadata_from_filepath
timestamp:
parse_from: time
layout: '%%Y-%%m-%%dT%%H:%%M:%%S.%%LZ'
# Extract metadata from file path
- type: regex_parser
id: extract_metadata_from_filepath
regex: '^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]{36})\/(?P<container_name>[^\._]+)\/(?P<run_id>\d+)\.log$'
parse_from: $$attributes.file_path
# Move out attributes to Attributes
- type: metadata
attributes:
stream: 'EXPR($.stream)'
k8s.container.name: 'EXPR($.container_name)'
k8s.namespace.name: 'EXPR($.namespace)'
k8s.pod.name: 'EXPR($.pod_name)'
run_id: 'EXPR($.run_id)'
k8s.pod.uid: 'EXPR($.uid)'
# Clean up log record
- type: restructure
id: clean-up-log-record
ops:
- remove: logtag
- remove: stream
- remove: container_name
- remove: namespace
- remove: pod_name
- remove: run_id
- remove: uid
`)
}

// NewKubernetesCRIContainerdNoAttributesOpsWriter returns FileLogK8sWriter with configuration
// to parse only CRI-Containerd kubernetes logs without reformatting attributes
func NewKubernetesCRIContainerdNoAttributesOpsWriter() *FileLogK8sWriter {
return NewFileLogK8sWriter(`
filelog:
include: [ %s ]
start_at: beginning
include_file_path: true
include_file_name: false
operators:
# Parse CRI-Containerd format
- type: regex_parser
id: parser-containerd
regex: '^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
output: extract_metadata_from_filepath
timestamp:
parse_from: time
layout: '%%Y-%%m-%%dT%%H:%%M:%%S.%%LZ'
# Extract metadata from file path
- type: regex_parser
id: extract_metadata_from_filepath
regex: '^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]{36})\/(?P<container_name>[^\._]+)\/(?P<run_id>\d+)\.log$'
parse_from: $$attributes.file_path
`)
}

// NewCRIContainerdWriter returns FileLogK8sWriter with configuration
// to parse only CRI-Containerd logs (no extracting metadata from filename)
func NewCRIContainerdWriter() *FileLogK8sWriter {
return NewFileLogK8sWriter(`
filelog:
include: [ %s ]
start_at: beginning
include_file_path: true
include_file_name: false
operators:
# Parse CRI-Containerd format
- type: regex_parser
id: parser-containerd
regex: '^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
timestamp:
parse_from: time
layout: '%%Y-%%m-%%dT%%H:%%M:%%S.%%LZ'
`)
}
36 changes: 36 additions & 0 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,42 @@ func TestLog10kDPS(t *testing.T) {
ExpectedMaxRAM: 85,
},
},
{
name: "kubernetes containers",
sender: datasenders.NewKubernetesContainerWriter(),
receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 100,
ExpectedMaxRAM: 150,
},
},
{
name: "k8s CRI-Containerd",
sender: datasenders.NewKubernetesCRIContainerdWriter(),
receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 100,
ExpectedMaxRAM: 150,
},
},
{
name: "k8s CRI-Containerd no attr ops",
sender: datasenders.NewKubernetesCRIContainerdNoAttributesOpsWriter(),
receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 100,
ExpectedMaxRAM: 150,
},
},
{
name: "CRI-Containerd",
sender: datasenders.NewCRIContainerdWriter(),
receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 100,
ExpectedMaxRAM: 150,
},
},
}

processors := map[string]string{
Expand Down