Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce OTLP receiver configuration flags #3710

Merged
merged 7 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
agentGrpcRep "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc"
"github.com/jaegertracing/jaeger/cmd/all-in-one/setupcontext"
collectorApp "github.com/jaegertracing/jaeger/cmd/collector/app"
collectorFlags "github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/docs"
"github.com/jaegertracing/jaeger/cmd/env"
"github.com/jaegertracing/jaeger/cmd/flags"
Expand Down Expand Up @@ -146,7 +147,7 @@ by default uses only in-memory database.`,
if err != nil {
logger.Fatal("Failed to configure connection for grpc", zap.Error(err))
}
cOpts, err := new(collectorApp.CollectorOptions).InitFromViper(v)
cOpts, err := new(collectorFlags.CollectorOptions).InitFromViper(v, logger)
if err != nil {
logger.Fatal("Failed to initialize collector", zap.Error(err))
}
Expand Down Expand Up @@ -227,7 +228,7 @@ by default uses only in-memory database.`,
agentApp.AddFlags,
agentRep.AddFlags,
agentGrpcRep.AddFlags,
collectorApp.AddFlags,
collectorFlags.AddFlags,
queryApp.AddFlags,
strategyStoreFactory.AddFlags,
metricsReaderFactory.AddFlags,
Expand Down
33 changes: 17 additions & 16 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
Expand All @@ -34,6 +35,11 @@ import (
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
metricNumWorkers = "collector.num-workers"
metricQueueSize = "collector.queue-size"
)

// Collector returns the collector as a manageable unit of work
type Collector struct {
// required to start a new collector
Expand Down Expand Up @@ -82,10 +88,10 @@ func New(params *CollectorParams) *Collector {
}

// Start the component and underlying dependencies
func (c *Collector) Start(options *CollectorOptions) error {
func (c *Collector) Start(options *flags.CollectorOptions) error {
handlerBuilder := &SpanHandlerBuilder{
SpanWriter: c.spanWriter,
CollectorOpts: *options,
CollectorOpts: options,
Logger: c.logger,
MetricsFactory: c.metricsFactory,
}
Expand Down Expand Up @@ -145,28 +151,23 @@ func (c *Collector) Start(options *CollectorOptions) error {
}
c.zkServer = zkServer

otlpReceiver, err := handler.StartOtelReceiver(
handler.OtelReceiverOptions{
GRPCHostPort: options.OTLP.GRPCHostPort,
HTTPHostPort: options.OTLP.HTTPHostPort,
},
c.logger,
c.spanProcessor,
)
if err != nil {
return fmt.Errorf("could not start OTLP receiver: %w", err)
if options.OTLP.Enabled {
otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor)
if err != nil {
return fmt.Errorf("could not start OTLP receiver: %w", err)
}
c.otlpReceiver = otlpReceiver
}
c.otlpReceiver = otlpReceiver

c.publishOpts(options)

return nil
}

func (c *Collector) publishOpts(cOpts *CollectorOptions) {
func (c *Collector) publishOpts(cOpts *flags.CollectorOptions) {
internalFactory := c.metricsFactory.Namespace(metrics.NSOptions{Name: "internal"})
internalFactory.Gauge(metrics.Options{Name: collectorNumWorkers}).Update(int64(cOpts.NumWorkers))
internalFactory.Gauge(metrics.Options{Name: collectorQueueSize}).Update(int64(cOpts.QueueSize))
internalFactory.Gauge(metrics.Options{Name: metricNumWorkers}).Update(int64(cOpts.NumWorkers))
internalFactory.Gauge(metrics.Options{Name: metricQueueSize}).Update(int64(cOpts.QueueSize))
}

// Close the component and all its underlying dependencies
Expand Down
25 changes: 17 additions & 8 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
Expand All @@ -34,12 +35,14 @@ import (

var _ (io.Closer) = (*Collector)(nil)

func optionsForEphemeralPorts() *CollectorOptions {
collectorOpts := &CollectorOptions{}
func optionsForEphemeralPorts() *flags.CollectorOptions {
collectorOpts := &flags.CollectorOptions{}
collectorOpts.GRPC.HostPort = ":0"
collectorOpts.HTTP.HostPort = ":0"
collectorOpts.OTLP.GRPCHostPort = ":0"
collectorOpts.OTLP.HTTPHostPort = ":0"
collectorOpts.OTLP.Enabled = true
collectorOpts.OTLP.GRPC.HostPort = ":0"
collectorOpts.OTLP.HTTP.HostPort = ":0"
collectorOpts.Zipkin.HTTPHostPort = ":0"
return collectorOpts
}

Expand All @@ -59,13 +62,15 @@ func TestNewCollector(t *testing.T) {
StrategyStore: strategyStore,
HealthCheck: hc,
})

collectorOpts := optionsForEphemeralPorts()
require.NoError(t, c.Start(collectorOpts))
assert.NotNil(t, c.SpanHandlers())
assert.NoError(t, c.Close())
}

func TestCollector_StartErrors(t *testing.T) {
run := func(name string, options *CollectorOptions, expErr string) {
run := func(name string, options *flags.CollectorOptions, expErr string) {
t.Run(name, func(t *testing.T) {
hc := healthcheck.New()
logger := zap.NewNop()
Expand All @@ -87,7 +92,7 @@ func TestCollector_StartErrors(t *testing.T) {
})
}

var options *CollectorOptions
var options *flags.CollectorOptions

options = optionsForEphemeralPorts()
options.GRPC.HostPort = ":-1"
Expand All @@ -102,8 +107,12 @@ func TestCollector_StartErrors(t *testing.T) {
run("Zipkin", options, "could not start Zipkin server")

options = optionsForEphemeralPorts()
options.OTLP.HTTPHostPort = ":-1"
run("OTLP", options, "could not start OTLP receiver")
options.OTLP.GRPC.HostPort = ":-1"
run("OTLP/GRPC", options, "could not start OTLP receiver")

options = optionsForEphemeralPorts()
options.OTLP.HTTP.HostPort = ":-1"
run("OTLP/HTTP", options, "could not start OTLP receiver")
}

type mockStrategyStore struct {
Expand Down
158 changes: 0 additions & 158 deletions cmd/collector/app/flags.go

This file was deleted.

Loading