Skip to content

Commit

Permalink
Test graph.go, fix bug with shared receivers/exporters
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Oct 31, 2022
1 parent 00401f1 commit 5021741
Show file tree
Hide file tree
Showing 4 changed files with 384 additions and 8 deletions.
34 changes: 30 additions & 4 deletions service/internal/pipelines/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ func (g *pipelinesGraph) addReceiver(
) error {
receiverNodeID := newReceiverNodeID(pipelineID.Type(), recvID)

// If already created a node for this [DataType, ComponentID] nothing to do.
if g.componentGraph.Node(receiverNodeID.ID()) != nil {
if rcvrNode := g.componentGraph.Node(receiverNodeID.ID()); rcvrNode != nil {
g.pipelineGraphs[pipelineID].addReceiver(rcvrNode)
return nil
}

Expand Down Expand Up @@ -228,8 +228,8 @@ func (g *pipelinesGraph) addExporter(
) error {
exporterNodeID := newExporterNodeID(pipelineID.Type(), exprID)

// If already created a node for this [DataType, ComponentID] nothing to do.
if g.componentGraph.Node(exporterNodeID.ID()) != nil {
if expNode := g.componentGraph.Node(exporterNodeID.ID()); expNode != nil {
g.pipelineGraphs[pipelineID].addExporter(expNode)
return nil
}

Expand Down Expand Up @@ -432,6 +432,32 @@ func (g *pipelinesGraph) GetExporters() map[config.DataType]map[config.Component
return exportersMap
}

func (g *pipelinesGraph) getReceivers() map[config.DataType]map[config.ComponentID]component.Receiver {
receiversMap := make(map[config.DataType]map[config.ComponentID]component.Receiver)
receiversMap[config.TracesDataType] = make(map[config.ComponentID]component.Receiver)
receiversMap[config.MetricsDataType] = make(map[config.ComponentID]component.Receiver)
receiversMap[config.LogsDataType] = make(map[config.ComponentID]component.Receiver)

for _, pg := range g.pipelineGraphs {
for _, rcvrNode := range pg.receivers {
rcvrOrConnNode := g.componentGraph.Node(rcvrNode.ID())
rcvrNode, ok := rcvrOrConnNode.(*receiverNode)
if !ok {
continue
}
switch rcvrNode.pipelineType {
case config.TracesDataType:
receiversMap[config.TracesDataType][rcvrNode.componentID] = rcvrNode.Component
case config.MetricsDataType:
receiversMap[config.MetricsDataType][rcvrNode.componentID] = rcvrNode.Component
case config.LogsDataType:
receiversMap[config.LogsDataType][rcvrNode.componentID] = rcvrNode.Component
}
}
}
return receiversMap
}

// TODO handle Capabilities

func (g *pipelinesGraph) HandleZPages(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading

0 comments on commit 5021741

Please sign in to comment.