Skip to content

Commit

Permalink
Extract service.pipelines interface, add skeleton graph implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Dec 12, 2022
1 parent d2625f0 commit dbc3603
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 440 deletions.
16 changes: 16 additions & 0 deletions .chloggen/component-nodes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Extract service.pipelines interface, add skeleton graph implementation

# One or more tracking issues or pull requests related to the change
issues: [6764]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opentelemetry.io/contrib/zpages v0.37.0 // indirect
golang.org/x/text v0.4.0 // indirect
gonum.org/v1/gonum v0.12.0 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o=
gonum.org/v1/gonum v0.12.0/go.mod h1:73TDxJfAAHeA8Mk9mf8NlIppyhQNo5GLTcYeqgo2lvY=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
Expand Down
95 changes: 95 additions & 0 deletions service/graph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package service // import "go.opentelemetry.io/collector/service"

import (
"context"
"net/http"

"gonum.org/v1/gonum/graph"
"gonum.org/v1/gonum/graph/simple"
"gonum.org/v1/gonum/graph/topo"

"go.opentelemetry.io/collector/component"
)

var _ pipelines = (*serviceGraph)(nil)

type serviceGraph struct {

// All component instances represented as nodes, with directed edges indicating data flow.
componentGraph *simple.DirectedGraph

// Keep track of how nodes relate to pipelines, so we can declare edges in the graph.
pipelineGraphs map[component.ID]*pipelineGraph
}

// A node-based representation of a pipeline configuration.
type pipelineGraph struct {

// Use maps for receivers and exporters to assist with deduplication of connector instances.
receivers map[int64]graph.Node
exporters map[int64]graph.Node

// The order of processors is very important. Therefore use a slice for processors.
processors []graph.Node
}

func (g *serviceGraph) StartAll(ctx context.Context, host component.Host) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
return err
}

// Start exporters first, and work towards receivers
for i := len(nodes) - 1; i >= 0; i-- {
comp, ok := nodes[i].(component.Component)
if !ok {
continue
}
if compErr := comp.Start(ctx, host); compErr != nil {
return compErr
}
}
return nil
}

func (g *serviceGraph) ShutdownAll(ctx context.Context) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
return err
}

// Stop receivers first, and work towards exporters
for i := 0; i < len(nodes); i++ {
comp, ok := nodes[i].(component.Component)
if !ok {
continue
}
if compErr := comp.Shutdown(ctx); compErr != nil {
return compErr
}
}
return nil
}

func (g *serviceGraph) GetExporters() map[component.DataType]map[component.ID]component.Component {
// TODO actual implemenation
return make(map[component.DataType]map[component.ID]component.Component)
}

func (g *serviceGraph) HandleZPages(w http.ResponseWriter, r *http.Request) {
// TODO actual implemenation
}
2 changes: 1 addition & 1 deletion service/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type serviceHost struct {
factories component.Factories
buildInfo component.BuildInfo

pipelines *builtPipelines
pipelines
extensions *extensions.Extensions
}

Expand Down
Loading

0 comments on commit dbc3603

Please sign in to comment.