Skip to content

Commit

Permalink
Add full component graph build
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Jan 27, 2023
1 parent 8d4b9d5 commit 0b8d98d
Show file tree
Hide file tree
Showing 6 changed files with 2,113 additions and 28 deletions.
5 changes: 5 additions & 0 deletions connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,11 @@ func (b *Builder) CreateLogsToLogs(ctx context.Context, set CreateSettings, next
return f.CreateLogsToLogs(ctx, set, cfg, next)
}

func (b *Builder) IsConfigured(componentID component.ID) bool {
_, ok := b.cfgs[componentID]
return ok
}

func (b *Builder) Factory(componentType component.Type) component.Factory {
return b.factories[componentType]
}
Expand Down
5 changes: 4 additions & 1 deletion connector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,16 @@ func TestBuilderMissingConfig(t *testing.T) {
assert.Nil(t, l2l)
}

func TestBuilderFactory(t *testing.T) {
func TestBuilderGetters(t *testing.T) {
factories, err := MakeFactoryMap([]Factory{NewFactory("foo", nil)}...)
require.NoError(t, err)

cfgs := map[component.ID]component.Config{component.NewID("foo"): struct{}{}}
b := NewBuilder(cfgs, factories)

assert.True(t, b.IsConfigured(component.NewID("foo")))
assert.False(t, b.IsConfigured(component.NewID("bar")))

assert.NotNil(t, b.Factory(component.NewID("foo").Type()))
assert.Nil(t, b.Factory(component.NewID("bar").Type()))
}
Expand Down
236 changes: 229 additions & 7 deletions service/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ package service // import "go.opentelemetry.io/collector/service"

import (
"context"
"errors"
"fmt"
"net/http"

"go.uber.org/multierr"
"gonum.org/v1/gonum/graph"
"gonum.org/v1/gonum/graph/simple"
"gonum.org/v1/gonum/graph/topo"

Expand All @@ -31,21 +32,236 @@ var _ pipelines = (*pipelinesGraph)(nil)
type pipelinesGraph struct {
// All component instances represented as nodes, with directed edges indicating data flow.
componentGraph *simple.DirectedGraph

// Keep track of how nodes relate to pipelines, so we can declare edges in the graph.
pipelines map[component.ID]*pipelineNodes
}

func buildPipelinesGraph(ctx context.Context, set pipelinesSettings) (pipelines, error) {
pipelines := &pipelinesGraph{
componentGraph: simple.NewDirectedGraph(),
pipelines: make(map[component.ID]*pipelineNodes, len(set.PipelineConfigs)),
}
for pipelineID := range set.PipelineConfigs {
pipelines.pipelines[pipelineID] = &pipelineNodes{
receivers: make(map[int64]graph.Node),
exporters: make(map[int64]graph.Node),
}
}

if err := pipelines.createNodes(set); err != nil {
return nil, err
}
pipelines.createEdges()
if err := pipelines.buildNodes(ctx, set); err != nil {
return nil, err
}
return pipelines, nil
}

// Creates a node for each instance of a component and adds it to the graph
func (g *pipelinesGraph) createNodes(set pipelinesSettings) error {

// map[connectorID]pipelineIDs
// Keep track of connectors and where they are used.
connectorsAsExporter := make(map[component.ID][]component.ID)
connectorsAsReceiver := make(map[component.ID][]component.ID)

for pipelineID, pipelineCfg := range set.PipelineConfigs {
for _, recvID := range pipelineCfg.Receivers {
if set.Connectors.IsConfigured(recvID) {
connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)
continue
}
g.addReceiver(pipelineID, recvID)
}
for _, procID := range pipelineCfg.Processors {
g.addProcessor(pipelineID, procID)
}
for _, exprID := range pipelineCfg.Exporters {
if set.Connectors.IsConfigured(exprID) {
connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID)
continue
}
g.addExporter(pipelineID, exprID)
}
}

if len(connectorsAsExporter) != len(connectorsAsReceiver) {
return fmt.Errorf("each connector must be used as both receiver and exporter")
}
for connID, exprPipelineIDs := range connectorsAsExporter {
rcvrPipelineIDs, ok := connectorsAsReceiver[connID]
if !ok {
return fmt.Errorf("connector %q must be used as receiver, only found as exporter", connID)
}
for _, eID := range exprPipelineIDs {
for _, rID := range rcvrPipelineIDs {
g.addConnector(eID, rID, connID)
}
}
}
return nil
}

func (g *pipelinesGraph) addReceiver(pipelineID, recvID component.ID) {
node := newReceiverNode(pipelineID, recvID)
if rcvrNode := g.componentGraph.Node(node.ID()); rcvrNode != nil {
g.pipelines[pipelineID].addReceiver(rcvrNode)
return
}
g.pipelines[pipelineID].addReceiver(node)
g.componentGraph.AddNode(node)
}

func (g *pipelinesGraph) addProcessor(pipelineID, procID component.ID) {
node := newProcessorNode(pipelineID, procID)
g.pipelines[pipelineID].addProcessor(node)
g.componentGraph.AddNode(node)
}

func (g *pipelinesGraph) addExporter(pipelineID, exprID component.ID) {
node := newExporterNode(pipelineID, exprID)
if expNode := g.componentGraph.Node(node.ID()); expNode != nil {
g.pipelines[pipelineID].addExporter(expNode)
return
}
g.pipelines[pipelineID].addExporter(node)
g.componentGraph.AddNode(node)
}

func (g *pipelinesGraph) addConnector(exprPipelineID, rcvrPipelineID, connID component.ID) {
node := newConnectorNode(exprPipelineID.Type(), rcvrPipelineID.Type(), connID)
if connNode := g.componentGraph.Node(node.ID()); connNode != nil {
g.pipelines[exprPipelineID].addExporter(connNode)
g.pipelines[rcvrPipelineID].addReceiver(connNode)
return
}
g.pipelines[exprPipelineID].addExporter(node)
g.pipelines[rcvrPipelineID].addReceiver(node)
g.componentGraph.AddNode(node)
}

func (g *pipelinesGraph) createEdges() {
for pipelineID, pg := range g.pipelines {
fanOutToExporters := newFanOutNode(pipelineID)
for _, exporter := range pg.exporters {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(fanOutToExporters, exporter))
}

if len(pg.processors) == 0 {
for _, receiver := range pg.receivers {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, fanOutToExporters))
}
continue
}

fanInToProcessors := newFanInNode(pipelineID)
for _, receiver := range pg.receivers {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, fanInToProcessors))
}

g.componentGraph.SetEdge(g.componentGraph.NewEdge(fanInToProcessors, pg.processors[0]))
for i := 0; i+1 < len(pg.processors); i++ {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(pg.processors[i], pg.processors[i+1]))
}
g.componentGraph.SetEdge(g.componentGraph.NewEdge(pg.processors[len(pg.processors)-1], fanOutToExporters))
}
}

func (g *pipelinesGraph) buildNodes(ctx context.Context, set pipelinesSettings) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
return err // TODO clean up error message
}

for i := len(nodes) - 1; i >= 0; i-- {
node := nodes[i]
switch n := node.(type) {
case *receiverNode:
err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Receivers, g.nextConsumers(n.ID()))
case *processorNode:
err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Processors, g.nextConsumers(n.ID()))
case *connectorNode:
err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Connectors, g.nextConsumers(n.ID()))
case *exporterNode:
err = n.build(ctx, set.Telemetry, set.BuildInfo, set.Exporters)
case *fanInNode:
n.build(g.nextConsumers(n.ID()), g.nextProcessors(n.ID()))
case *fanOutNode:
n.build(g.nextConsumers(n.ID()))
}
if err != nil {
return err
}
}
return nil
}

// Find all nodes
func (g *pipelinesGraph) nextConsumers(nodeID int64) []baseConsumer {
nextNodes := g.componentGraph.From(nodeID)
nexts := make([]baseConsumer, 0, nextNodes.Len())
for nextNodes.Next() {
switch next := nextNodes.Node().(type) {
case *processorNode:
nexts = append(nexts, next.Component.(baseConsumer))
case *exporterNode:
nexts = append(nexts, next.Component.(baseConsumer))
case *connectorNode:
nexts = append(nexts, next.Component.(baseConsumer))
case *fanInNode:
nexts = append(nexts, next.baseConsumer)
case *fanOutNode:
nexts = append(nexts, next.baseConsumer)
}
}
return nexts
}

// Get all processors in this pipeline
func (g *pipelinesGraph) nextProcessors(nodeID int64) []*processorNode {
nextNodes := g.componentGraph.From(nodeID)
if procNode, ok := nextNodes.Node().(*processorNode); ok {
return append([]*processorNode{procNode}, g.nextProcessors(procNode.ID())...)
}
return []*processorNode{}
}

// A node-based representation of a pipeline configuration.
type pipelineNodes struct {
// Use maps for receivers and exporters to assist with deduplication of connector instances.
receivers map[int64]graph.Node
exporters map[int64]graph.Node

// The order of processors is very important. Therefore use a slice for processors.
processors []graph.Node
}

func buildPipelinesGraph(_ context.Context, _ pipelinesSettings) (pipelines, error) {
err := errors.New("not yet implemented")
return &pipelinesGraph{componentGraph: simple.NewDirectedGraph()}, err
func (p *pipelineNodes) addReceiver(node graph.Node) {
p.receivers[node.ID()] = node
}
func (p *pipelineNodes) addProcessor(node graph.Node) {
p.processors = append(p.processors, node)
}
func (p *pipelineNodes) addExporter(node graph.Node) {
p.exporters[node.ID()] = node
}

func (g *pipelinesGraph) StartAll(ctx context.Context, host component.Host) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
return err
}

// Start exporters first, and work towards receivers
for i := len(nodes) - 1; i >= 0; i-- {
if compErr := nodes[i].(component.Component).Start(ctx, host); compErr != nil {
comp, ok := nodes[i].(component.Component)
if !ok {
// Skip fanin/out nodes
continue
}
if compErr := comp.Start(ctx, host); compErr != nil {
return compErr
}
}
Expand All @@ -57,17 +273,23 @@ func (g *pipelinesGraph) ShutdownAll(ctx context.Context) error {
if err != nil {
return err
}

// Stop receivers first, and work towards exporters
var errs error
for i := 0; i < len(nodes); i++ {
errs = multierr.Append(errs, nodes[i].(component.Component).Shutdown(ctx))
comp, ok := nodes[i].(component.Component)
if !ok {
// Skip fanin/out nodes
continue
}
errs = multierr.Append(errs, comp.Shutdown(ctx))
}
return errs
}

func (g *pipelinesGraph) GetExporters() map[component.DataType]map[component.ID]component.Component {
// TODO actual implementation
return make(map[component.DataType]map[component.ID]component.Component)
return nil
}

func (g *pipelinesGraph) HandleZPages(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading

0 comments on commit 0b8d98d

Please sign in to comment.