From dc3071c9094bf1a2a37a02efe319ec117cc606e3 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Mon, 23 Jan 2023 14:54:23 -0500 Subject: [PATCH] Extract service.pipelines interface, add skeleton graph implementation (#6764) --- service/graph.go | 45 +++++++++++++++++++++++++++++++++++++++ service/host.go | 2 +- service/pipelines.go | 13 ++++++++++- service/pipelines_test.go | 4 +++- 4 files changed, 61 insertions(+), 3 deletions(-) create mode 100644 service/graph.go diff --git a/service/graph.go b/service/graph.go new file mode 100644 index 00000000000..dcf57cd3c72 --- /dev/null +++ b/service/graph.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service // import "go.opentelemetry.io/collector/service" + +import ( + "context" + "net/http" + + "go.opentelemetry.io/collector/component" +) + +var _ pipelines = (*pipelinesGraph)(nil) + +type pipelinesGraph struct{} + +func (g *pipelinesGraph) StartAll(ctx context.Context, host component.Host) error { + // TODO actual implementation + return nil +} + +func (g *pipelinesGraph) ShutdownAll(ctx context.Context) error { + // TODO actual implementation + return nil +} + +func (g *pipelinesGraph) GetExporters() map[component.DataType]map[component.ID]component.Component { + // TODO actual implementation + return make(map[component.DataType]map[component.ID]component.Component) +} + +func (g *pipelinesGraph) HandleZPages(w http.ResponseWriter, r *http.Request) { + // TODO actual implementation +} diff --git a/service/host.go b/service/host.go index 52bbfbdee16..26aa9238c76 100644 --- a/service/host.go +++ b/service/host.go @@ -36,7 +36,7 @@ type serviceHost struct { buildInfo component.BuildInfo - pipelines *builtPipelines + pipelines serviceExtensions *extensions.Extensions } diff --git a/service/pipelines.go b/service/pipelines.go index ca9e9270b1f..4659ad88a7a 100644 --- a/service/pipelines.go +++ b/service/pipelines.go @@ -23,6 +23,7 @@ import ( "go.uber.org/multierr" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/processor" @@ -39,6 +40,15 @@ const ( zComponentKind = "zcomponentkind" ) +type pipelines interface { + StartAll(ctx context.Context, host component.Host) error + ShutdownAll(ctx context.Context) error + GetExporters() map[component.DataType]map[component.ID]component.Component + HandleZPages(w http.ResponseWriter, r *http.Request) +} + +var _ pipelines = (*builtPipelines)(nil) + // baseConsumer redeclared here since not public in consumer package. May consider to make that public. type baseConsumer interface { Capabilities() consumer.Capabilities @@ -187,13 +197,14 @@ type pipelinesSettings struct { Receivers *receiver.Builder Processors *processor.Builder Exporters *exporter.Builder + Connectors *connector.Builder // PipelineConfigs is a map of component.ID to PipelineConfig. PipelineConfigs map[component.ID]*PipelineConfig } // buildPipelines builds all pipelines from config. -func buildPipelines(ctx context.Context, set pipelinesSettings) (*builtPipelines, error) { +func buildPipelines(ctx context.Context, set pipelinesSettings) (pipelines, error) { exps := &builtPipelines{ telemetry: set.Telemetry, allReceivers: make(map[component.DataType]map[component.ID]component.Component), diff --git a/service/pipelines_test.go b/service/pipelines_test.go index c8cf3764286..5d6704a8dca 100644 --- a/service/pipelines_test.go +++ b/service/pipelines_test.go @@ -181,7 +181,7 @@ func TestBuildPipelines(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { // Build the pipeline - pipelines, err := buildPipelines(context.Background(), pipelinesSettings{ + pips, err := buildPipelines(context.Background(), pipelinesSettings{ Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), Receivers: receiver.NewBuilder( @@ -211,7 +211,9 @@ func TestBuildPipelines(t *testing.T) { PipelineConfigs: test.pipelineConfigs, }) assert.NoError(t, err) + assert.IsType(t, &builtPipelines{}, pips) + pipelines := pips.(*builtPipelines) assert.NoError(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) for dt, pipeline := range test.pipelineConfigs {