Skip to content

Commit

Permalink
[pkg/telemetrygen] add support exporting log data (#18741)
Browse files Browse the repository at this point in the history
* [pkg/telemetrygen] add support exporting log data

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>

* add changelog

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>

* fix for lint checkt

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>

* fix for lint check

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>

* Update cmd/telemetrygen/internal/logs/logs.go

Co-authored-by: Pablo Baeyens <pbaeyens31+github@gmail.com>

* Update cmd/telemetrygen/internal/logs/logs.go

Co-authored-by: Pablo Baeyens <pbaeyens31+github@gmail.com>

* fix pdata pacakge version

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>

---------

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
Co-authored-by: Pablo Baeyens <pbaeyens31+github@gmail.com>
  • Loading branch information
fatsheep9146 and mx-psi authored Feb 24, 2023
1 parent 73f6159 commit c35fe0a
Show file tree
Hide file tree
Showing 8 changed files with 372 additions and 6 deletions.
16 changes: 16 additions & 0 deletions .chloggen/telemetrygen-log-support.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cmd/telemetrygen

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Initial implementation of `telemetrygen logs` subcommand for exporting log data

# One or more tracking issues related to the change
issues: [12927]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
17 changes: 16 additions & 1 deletion cmd/telemetrygen/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (

"github.com/spf13/cobra"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/traces"
)

var (
tracesCfg *traces.Config
metricsCfg *metrics.Config
logsCfg *logs.Config
)

// rootCmd is the root command on which will be run children commands
Expand Down Expand Up @@ -56,15 +58,28 @@ var metricsCmd = &cobra.Command{
},
}

// logsCmd is the command responsible for sending logs
var logsCmd = &cobra.Command{
Use: "logs",
Short: "Simulates a client generating logs",
Example: "telemetrygen logs",
RunE: func(cmd *cobra.Command, args []string) error {
return logs.Start(logsCfg)
},
}

func init() {
rootCmd.AddCommand(tracesCmd, metricsCmd)
rootCmd.AddCommand(tracesCmd, metricsCmd, logsCmd)

tracesCfg = new(traces.Config)
tracesCfg.Flags(tracesCmd.Flags())

metricsCfg = new(metrics.Config)
metricsCfg.Flags(metricsCmd.Flags())

logsCfg = new(logs.Config)
logsCfg.Flags(logsCmd.Flags())

// Disabling completion command for end user
// https://github.com/spf13/cobra/blob/master/shell_completions.md
rootCmd.CompletionOptions.DisableDefaultCmd = true
Expand Down
7 changes: 6 additions & 1 deletion cmd/telemetrygen/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.1
go.opentelemetry.io/collector/pdata v1.0.0-rc6
go.opentelemetry.io/collector/semconv v0.72.0
go.opentelemetry.io/otel v1.13.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.36.0
Expand All @@ -28,15 +29,19 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.13.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.13.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.36.0 // indirect
go.opentelemetry.io/otel/metric v0.36.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
Expand Down
15 changes: 11 additions & 4 deletions cmd/telemetrygen/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions cmd/telemetrygen/internal/logs/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logs

import (
"github.com/spf13/pflag"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common"
)

// Config describes the test scenario.
type Config struct {
common.Config
NumLogs int
}

// Flags registers config flags.
func (c *Config) Flags(fs *pflag.FlagSet) {
c.CommonFlags(fs)
fs.IntVar(&c.NumLogs, "logs", 1, "Number of logs to generate in each worker (ignored if duration is provided)")
}
124 changes: 124 additions & 0 deletions cmd/telemetrygen/internal/logs/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logs

import (
"context"
"fmt"
"sync"
"time"

"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common"
)

type exporter interface {
export(plog.Logs) error
}

type gRPCClientExporter struct {
client plogotlp.GRPCClient
}

func (e *gRPCClientExporter) export(logs plog.Logs) error {
req := plogotlp.NewExportRequestFromLogs(logs)
if _, err := e.client.Export(context.Background(), req); err != nil {
return err
}
return nil
}

// Start starts the log telemetry generator
func Start(cfg *Config) error {
logger, err := common.CreateLogger()
if err != nil {
return err
}

if cfg.UseHTTP {
return fmt.Errorf("http is not supported by 'telemetrygen logs'")
}

if !cfg.Insecure {
return fmt.Errorf("'telemetrygen logs' only supports insecure gRPC")
}

// only support grpc in insecure mode
clientConn, err := grpc.DialContext(context.TODO(), cfg.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
exporter := &gRPCClientExporter{
client: plogotlp.NewGRPCClient(clientConn),
}

if err = Run(cfg, exporter, logger); err != nil {
logger.Error("failed to stop the exporter", zap.Error(err))
return err
}

return nil
}

// Run executes the test scenario.
func Run(c *Config, exp exporter, logger *zap.Logger) error {
if c.TotalDuration > 0 {
c.NumLogs = 0
} else if c.NumLogs <= 0 {
return fmt.Errorf("either `logs` or `duration` must be greater than 0")
}

limit := rate.Limit(c.Rate)
if c.Rate == 0 {
limit = rate.Inf
logger.Info("generation of logs isn't being throttled")
} else {
logger.Info("generation of logs is limited", zap.Float64("per-second", float64(limit)))
}

wg := sync.WaitGroup{}
running := atomic.NewBool(true)
res := resource.NewWithAttributes(semconv.SchemaURL, c.GetAttributes()...)

for i := 0; i < c.WorkerCount; i++ {
wg.Add(1)
w := worker{
numLogs: c.NumLogs,
limitPerSecond: limit,
totalDuration: c.TotalDuration,
running: running,
wg: &wg,
logger: logger.With(zap.Int("worker", i)),
index: i,
}

go w.simulateLogs(res, exp)
}
if c.TotalDuration > 0 {
time.Sleep(c.TotalDuration)
running.Store(false)
}
wg.Wait()
return nil
}
74 changes: 74 additions & 0 deletions cmd/telemetrygen/internal/logs/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logs

import (
"context"
"sync"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/sdk/resource"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

type worker struct {
running *atomic.Bool // pointer to shared flag that indicates it's time to stop the test
numLogs int // how many logs the worker has to generate (only when duration==0)
totalDuration time.Duration // how long to run the test for (overrides `numLogs`)
limitPerSecond rate.Limit // how many logs per second to generate
wg *sync.WaitGroup // notify when done
logger *zap.Logger // logger
index int // worker index
}

func (w worker) simulateLogs(res *resource.Resource, exporter exporter) {
limiter := rate.NewLimiter(w.limitPerSecond, 1)
var i int64

for w.running.Load() {
logs := plog.NewLogs()
nRes := logs.ResourceLogs().AppendEmpty().Resource()
attrs := res.Attributes()
for _, attr := range attrs {
nRes.Attributes().PutStr(string(attr.Key), attr.Value.AsString())
}
log := logs.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
log.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
log.SetDroppedAttributesCount(1)
log.SetSeverityNumber(plog.SeverityNumberInfo)
log.SetSeverityText("Info")
lattrs := log.Attributes()
lattrs.PutStr("app", "server")

if err := exporter.export(logs); err != nil {
w.logger.Fatal("exporter failed", zap.Error(err))
}
if err := limiter.Wait(context.Background()); err != nil {
w.logger.Fatal("limiter wait failed, retry", zap.Error(err))
}

i++
if w.numLogs != 0 && i >= int64(w.numLogs) {
break
}
}

w.logger.Info("logs generated", zap.Int64("logs", i))
w.wg.Done()
}
Loading

0 comments on commit c35fe0a

Please sign in to comment.