Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
Fix plugin inputs (#156)
Browse files Browse the repository at this point in the history
* Add testing for IDs

* Test Ops is pipeline

* Add testing template

* Change how default outputs are set

* Add testing for outputs

* Remove old default outputs func

* Add ID testing

* Add testing for output handling

* Update Tests

* Update mock comment

* Implement PR feedback

* Update comment

* Implement PR feedback

* Update Comments

* Force CI

* Implement PR feedback

* Add new test case
  • Loading branch information
Mrod1598 authored Jun 1, 2021
1 parent 6184b76 commit aee67dd
Show file tree
Hide file tree
Showing 12 changed files with 462 additions and 14 deletions.
15 changes: 15 additions & 0 deletions operator/builtin/transformer/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func (c RouterOperatorConfig) Build(bc operator.BuildContext) ([]operator.Operat
return []operator.Operator{routerOperator}, nil
}

// BuildsMultipleOps returns false
func (c RouterOperatorConfig) BuildsMultipleOps() bool { return false }

// RouterOperator is an operator that routes entries based on matching expressions
type RouterOperator struct {
helper.BasicOperator
Expand Down Expand Up @@ -157,6 +160,15 @@ func (p *RouterOperator) Outputs() []operator.Operator {
return outputs
}

// GetOutputIDs will return all connected operators.
func (p *RouterOperator) GetOutputIDs() []string {
outputs := make([]string, 0, len(p.routes))
for _, route := range p.routes {
outputs = append(outputs, route.OutputIDs...)
}
return outputs
}

// SetOutputs will set the outputs of the router operator.
func (p *RouterOperator) SetOutputs(operators []operator.Operator) error {
for _, route := range p.routes {
Expand All @@ -170,6 +182,9 @@ func (p *RouterOperator) SetOutputs(operators []operator.Operator) error {
return nil
}

// SetOutputIDs will do nothing.
func (p *RouterOperator) SetOutputIDs(opIDs []string) {}

// findOperators will find a subset of operators from a collection.
func (p *RouterOperator) findOperators(operators []operator.Operator, operatorIDs []string) ([]operator.Operator, error) {
result := make([]operator.Operator, 0)
Expand Down
1 change: 1 addition & 0 deletions operator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Builder interface {
ID() string
Type() string
Build(BuildContext) ([]Operator, error)
BuildsMultipleOps() bool
}

// UnmarshalJSON will unmarshal a config from JSON.
Expand Down
1 change: 1 addition & 0 deletions operator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type FakeBuilder struct {
func (f *FakeBuilder) Build(context BuildContext) ([]Operator, error) { return nil, nil }
func (f *FakeBuilder) ID() string { return "plugin" }
func (f *FakeBuilder) Type() string { return "plugin" }
func (f *FakeBuilder) BuildsMultipleOps() bool { return false }

func TestUnmarshalJSONErrors(t *testing.T) {
t.Cleanup(func() {
Expand Down
12 changes: 12 additions & 0 deletions operator/helper/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func (c OutputConfig) Build(context operator.BuildContext) (OutputOperator, erro
return outputOperator, nil
}

// BuildsMultipleOps Returns false
func (c OutputConfig) BuildsMultipleOps() bool { return false }

// OutputOperator provides a basic implementation of an output operator.
type OutputOperator struct {
BasicOperator
Expand All @@ -65,10 +68,19 @@ func (o *OutputOperator) Outputs() []operator.Operator {
return []operator.Operator{}
}

// GetOutputIDs will always return an empty array for an output ID.
func (o *OutputOperator) GetOutputIDs() []string {
return []string{}
}

// SetOutputs will return an error if called.
func (o *OutputOperator) SetOutputs(operators []operator.Operator) error {
return errors.NewError(
"Operator can not output, but is attempting to set an output.",
"This is an unexpected internal error. Please submit a bug/issue.",
)
}

// SetOutputIDs will return nothing and does nothing.
func (o *OutputOperator) SetOutputIDs(opIDs []string) {
}
18 changes: 15 additions & 3 deletions operator/helper/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ func (c WriterConfig) Build(bc operator.BuildContext) (WriterOperator, error) {

// Namespace all the output IDs
namespacedIDs := c.OutputIDs.WithNamespace(bc)
if len(namespacedIDs) == 0 {
namespacedIDs = bc.DefaultOutputIDs
}

writer := WriterOperator{
OutputIDs: namespacedIDs,
Expand All @@ -56,6 +53,11 @@ func (c WriterConfig) Build(bc operator.BuildContext) (WriterOperator, error) {
return writer, nil
}

// BuildsMultipleOps Returns false as a base line
func (c WriterConfig) BuildsMultipleOps() bool {
return false
}

// WriterOperator is an operator that can write to other operators.
type WriterOperator struct {
BasicOperator
Expand Down Expand Up @@ -84,6 +86,11 @@ func (w *WriterOperator) Outputs() []operator.Operator {
return w.OutputOperators
}

// GetOutputIDs returns the output IDs of the writer operator.
func (w *WriterOperator) GetOutputIDs() []string {
return w.OutputIDs
}

// SetOutputs will set the outputs of the operator.
func (w *WriterOperator) SetOutputs(operators []operator.Operator) error {
outputOperators := make([]operator.Operator, 0)
Expand All @@ -105,6 +112,11 @@ func (w *WriterOperator) SetOutputs(operators []operator.Operator) error {
return nil
}

// SetOutputIDs will set the outputs of the operator.
func (w *WriterOperator) SetOutputIDs(opIds []string) {
w.OutputIDs = opIds
}

// FindOperator will find an operator matching the supplied id.
func (w *WriterOperator) findOperator(operators []operator.Operator, operatorID string) (operator.Operator, bool) {
for _, operator := range operators {
Expand Down
4 changes: 4 additions & 0 deletions operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ type Operator interface {
CanOutput() bool
// Outputs returns the list of connected outputs.
Outputs() []Operator
// GetOutputIDs returns the list of connected outputs.
GetOutputIDs() []string
// SetOutputs will set the connected outputs.
SetOutputs([]Operator) error
// SetOutputIDs will set the connected outputs' IDs.
SetOutputIDs([]string)

// CanProcess indicates if the operator will process entries from other operators.
CanProcess() bool
Expand Down
53 changes: 42 additions & 11 deletions pipeline/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,25 @@ import (
// Config is the configuration of a pipeline.
type Config []operator.Config

// BuildOperators builds the operators from the list of configs into operators
// BuildOperators builds the operators from the list of configs into operators.
func (c Config) BuildOperators(bc operator.BuildContext) ([]operator.Operator, error) {
// buildsMulti's key represents an operator's ID that builds multiple operators, e.g. Plugins.
// the value is the plugin's first operator's ID.
buildsMulti := make(map[string]string)
operators := make([]operator.Operator, 0, len(c))
for i, builder := range c {
nbc := getBuildContextWithDefaultOutput(c, i, bc)
op, err := builder.Build(nbc)
for _, builder := range c {
op, err := builder.Build(bc)
if err != nil {
return nil, err
}

if builder.BuildsMultipleOps() {
buildsMulti[bc.PrependNamespace(builder.ID())] = op[0].ID()
}
operators = append(operators, op...)
}
SetOutputIDs(operators, buildsMulti)

return operators, nil
}

Expand All @@ -53,12 +61,35 @@ func (c Config) BuildPipeline(bc operator.BuildContext, defaultOperator operator
return NewDirectedPipeline(operators)
}

func getBuildContextWithDefaultOutput(configs []operator.Config, i int, bc operator.BuildContext) operator.BuildContext {
if i+1 >= len(configs) {
return bc
}
// SetOutputIDs Loops through all the operators and sets a default output to the next operator in the slice.
// Additionally, if the output is set to a plugin, it sets the output to the first operator in the plugins pipeline.
func SetOutputIDs(operators []operator.Operator, buildsMulti map[string]string) error {
for i, op := range operators {
// because no output is specified at this point for the last operator,
// it will always be empty and there is nothing after it to automatically point towards, so we break the loop
if i+1 == len(operators) {
break
}

id := configs[i+1].ID()
id = bc.PrependNamespace(id)
return bc.WithDefaultOutputIDs([]string{id})
if len(op.GetOutputIDs()) == 0 {
op.SetOutputIDs([]string{operators[i+1].ID()})
continue
}

// Check if there are any plugins within the outputIDs of the operator. If there are, change the output to be the first op in the plugin
allOutputs := []string{}
pluginFound := false
for _, id := range op.GetOutputIDs() {
if pid, ok := buildsMulti[id]; ok {
id = pid
pluginFound = true
}
allOutputs = append(allOutputs, id)
}

if pluginFound {
op.SetOutputIDs(allOutputs)
}
}
return nil
}
2 changes: 2 additions & 0 deletions plugin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func (c *Config) Build(bc operator.BuildContext) ([]operator.Operator, error) {
return pipelineConfig.Pipeline.BuildOperators(nbc)
}

func (c *Config) BuildsMultipleOps() bool { return true }

func (c *Config) getRenderParams(bc operator.BuildContext) map[string]interface{} {
// Copy the parameters to avoid mutating them
params := map[string]interface{}{}
Expand Down
Loading

0 comments on commit aee67dd

Please sign in to comment.