Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add full component graph build #7045

Merged
5 changes: 5 additions & 0 deletions connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,11 @@ func (b *Builder) CreateLogsToLogs(ctx context.Context, set CreateSettings, next
return f.CreateLogsToLogs(ctx, set, cfg, next)
}

func (b *Builder) IsConfigured(componentID component.ID) bool {
_, ok := b.cfgs[componentID]
return ok
}

func (b *Builder) Factory(componentType component.Type) component.Factory {
return b.factories[componentType]
}
Expand Down
5 changes: 4 additions & 1 deletion connector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,16 @@ func TestBuilderMissingConfig(t *testing.T) {
assert.Nil(t, l2l)
}

func TestBuilderFactory(t *testing.T) {
func TestBuilderGetters(t *testing.T) {
factories, err := MakeFactoryMap([]Factory{NewFactory("foo", nil)}...)
require.NoError(t, err)

cfgs := map[component.ID]component.Config{component.NewID("foo"): struct{}{}}
b := NewBuilder(cfgs, factories)

assert.True(t, b.IsConfigured(component.NewID("foo")))
assert.False(t, b.IsConfigured(component.NewID("bar")))

assert.NotNil(t, b.Factory(component.NewID("foo").Type()))
assert.Nil(t, b.Factory(component.NewID("bar").Type()))
}
Expand Down
237 changes: 229 additions & 8 deletions service/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,248 @@ package service // import "go.opentelemetry.io/collector/service"

import (
"context"
"errors"
"net/http"

"go.uber.org/multierr"
"gonum.org/v1/gonum/graph"
"gonum.org/v1/gonum/graph/simple"
"gonum.org/v1/gonum/graph/topo"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/service/internal/fanoutconsumer"
)

var _ pipelines = (*pipelinesGraph)(nil)

type pipelinesGraph struct {
// All component instances represented as nodes, with directed edges indicating data flow.
componentGraph *simple.DirectedGraph

// Keep track of how nodes relate to pipelines, so we can declare edges in the graph.
pipelines map[component.ID]*pipelineNodes
}

func buildPipelinesGraph(ctx context.Context, set pipelinesSettings) (pipelines, error) {
pipelines := &pipelinesGraph{
componentGraph: simple.NewDirectedGraph(),
pipelines: make(map[component.ID]*pipelineNodes, len(set.PipelineConfigs)),
}
for pipelineID := range set.PipelineConfigs {
pipelines.pipelines[pipelineID] = &pipelineNodes{
receivers: make(map[int64]graph.Node),
exporters: make(map[int64]graph.Node),
}
}
pipelines.createNodes(set)
pipelines.createEdges()
return pipelines, pipelines.buildComponents(ctx, set)
}

// Creates a node for each instance of a component and adds it to the graph
func (g *pipelinesGraph) createNodes(set pipelinesSettings) {
// Keep track of connectors and where they are used. (map[connectorID][]pipelineID)
connectorsAsExporter := make(map[component.ID][]component.ID)
connectorsAsReceiver := make(map[component.ID][]component.ID)

for pipelineID, pipelineCfg := range set.PipelineConfigs {
pipe := g.pipelines[pipelineID]
for _, recvID := range pipelineCfg.Receivers {
if set.ConnectorBuilder.IsConfigured(recvID) {
connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)
continue
}
rcvrNode := g.createReceiver(pipelineID, recvID)
pipe.receivers[rcvrNode.ID()] = rcvrNode
}

pipe.capabilitiesNode = newCapabilitiesNode(pipelineID)

for _, procID := range pipelineCfg.Processors {
pipe.processors = append(pipe.processors, g.createProcessor(pipelineID, procID))
}

pipe.fanOutNode = newFanOutNode(pipelineID)

for _, exprID := range pipelineCfg.Exporters {
if set.ConnectorBuilder.IsConfigured(exprID) {
connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID)
continue
}
expNode := g.createExporter(pipelineID, exprID)
pipe.exporters[expNode.ID()] = expNode
}
}

for connID, exprPipelineIDs := range connectorsAsExporter {
for _, eID := range exprPipelineIDs {
for _, rID := range connectorsAsReceiver[connID] {
connNode := g.createConnector(eID, rID, connID)
g.pipelines[eID].exporters[connNode.ID()] = connNode
g.pipelines[rID].receivers[connNode.ID()] = connNode
}
}
}
}

func (g *pipelinesGraph) createReceiver(pipelineID, recvID component.ID) *receiverNode {
rcvrNode := newReceiverNode(pipelineID, recvID)
if node := g.componentGraph.Node(rcvrNode.ID()); node != nil {
return node.(*receiverNode)
}
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
g.componentGraph.AddNode(rcvrNode)
return rcvrNode
}

func (g *pipelinesGraph) createProcessor(pipelineID, procID component.ID) *processorNode {
procNode := newProcessorNode(pipelineID, procID)
g.componentGraph.AddNode(procNode)
return procNode
}

func (g *pipelinesGraph) createExporter(pipelineID, exprID component.ID) *exporterNode {
expNode := newExporterNode(pipelineID, exprID)
if node := g.componentGraph.Node(expNode.ID()); node != nil {
return node.(*exporterNode)
}
g.componentGraph.AddNode(expNode)
return expNode
}

func (g *pipelinesGraph) createConnector(exprPipelineID, rcvrPipelineID, connID component.ID) *connectorNode {
connNode := newConnectorNode(exprPipelineID.Type(), rcvrPipelineID.Type(), connID)
if node := g.componentGraph.Node(connNode.ID()); node != nil {
return node.(*connectorNode)
}
g.componentGraph.AddNode(connNode)
return connNode
}

func buildPipelinesGraph(_ context.Context, _ pipelinesSettings) (pipelines, error) {
err := errors.New("not yet implemented")
return &pipelinesGraph{componentGraph: simple.NewDirectedGraph()}, err
func (g *pipelinesGraph) createEdges() {
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
for _, pg := range g.pipelines {
for _, receiver := range pg.receivers {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, pg.capabilitiesNode))
}

var from, to graph.Node
from = pg.capabilitiesNode
for _, processor := range pg.processors {
to = processor
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))
from = processor
}
to = pg.fanOutNode
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))

for _, exporter := range pg.exporters {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(pg.fanOutNode, exporter))
}
}
}

func (g *pipelinesGraph) buildComponents(ctx context.Context, set pipelinesSettings) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
return err // TODO clean up error message
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
}

for i := len(nodes) - 1; i >= 0; i-- {
node := nodes[i]
switch n := node.(type) {
case *receiverNode:
n.Component, err = buildReceiver(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ReceiverBuilder,
component.NewIDWithName(n.pipelineType, "*"), g.nextConsumers(n.ID()))
case *processorNode:
n.Component, err = buildProcessor(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ProcessorBuilder,
n.pipelineID, g.nextConsumers(n.ID())[0])
case *exporterNode:
n.Component, err = buildExporter(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ExporterBuilder,
component.NewIDWithName(n.pipelineType, "*"))
case *connectorNode:
n.Component, err = buildConnector(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ConnectorBuilder,
n.exprPipelineType, n.rcvrPipelineType, g.nextConsumers(n.ID()))
case *capabilitiesNode:
n.baseConsumer = g.nextConsumers(n.ID())[0]
for _, proc := range g.pipelines[n.pipelineID].processors {
n.Capabilities.MutatesData = n.Capabilities.MutatesData ||
proc.Component.(baseConsumer).Capabilities().MutatesData
}
case *fanOutNode:
nexts := g.nextConsumers(n.ID())
switch n.pipelineID.Type() {
case component.DataTypeTraces:
consumers := make([]consumer.Traces, 0, len(nexts))
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Traces))
}
n.baseConsumer = fanoutconsumer.NewTraces(consumers)
case component.DataTypeMetrics:
consumers := make([]consumer.Metrics, 0, len(nexts))
for _, next := range nexts {

consumers = append(consumers, next.(consumer.Metrics))
}
n.baseConsumer = fanoutconsumer.NewMetrics(consumers)
case component.DataTypeLogs:
consumers := make([]consumer.Logs, 0, len(nexts))
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Logs))
}
n.baseConsumer = fanoutconsumer.NewLogs(consumers)
}
}
if err != nil {
return err
}
}
return nil
}

// Find all nodes
func (g *pipelinesGraph) nextConsumers(nodeID int64) []baseConsumer {
nextNodes := g.componentGraph.From(nodeID)
nexts := make([]baseConsumer, 0, nextNodes.Len())
for nextNodes.Next() {
nexts = append(nexts, nextNodes.Node().(consumerNode).getConsumer())
}
return nexts
}

// A node-based representation of a pipeline configuration.
type pipelineNodes struct {
// Use map to assist with deduplication of connector instances.
receivers map[int64]graph.Node

// The node to which receivers emit. Passes through to processors.
// Easily accessible as the first node in a pipeline.
*capabilitiesNode

// The order of processors is very important. Therefore use a slice for processors.
processors []*processorNode

// Emits to exporters.
*fanOutNode

// Use map to assist with deduplication of connector instances.
exporters map[int64]graph.Node
}

func (g *pipelinesGraph) StartAll(ctx context.Context, host component.Host) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
return err
}
// Start exporters first, and work towards receivers

// Start in reverse topological order so that downstream components
// are started before upstream components. This ensures that each
// component's consumer is ready to consume.
for i := len(nodes) - 1; i >= 0; i-- {
if compErr := nodes[i].(component.Component).Start(ctx, host); compErr != nil {
comp, ok := nodes[i].(component.Component)
if !ok {
// Skip capabilities/fanout nodes
continue
}
if compErr := comp.Start(ctx, host); compErr != nil {
return compErr
}
}
Expand All @@ -57,10 +269,19 @@ func (g *pipelinesGraph) ShutdownAll(ctx context.Context) error {
if err != nil {
return err
}
// Stop receivers first, and work towards exporters

// Stop in topological order so that upstream components
// are stopped before downstream components. This ensures
// that each component has a chance to drain to it's consumer
// before the consumer is stopped.
var errs error
for i := 0; i < len(nodes); i++ {
errs = multierr.Append(errs, nodes[i].(component.Component).Shutdown(ctx))
comp, ok := nodes[i].(component.Component)
if !ok {
// Skip capabilities/fanout nodes
continue
}
errs = multierr.Append(errs, comp.Shutdown(ctx))
}
return errs
}
Expand Down
Loading