diff --git a/operator/builtin/transformer/router/router.go b/operator/builtin/transformer/router/router.go index f5260f5a..989a760a 100644 --- a/operator/builtin/transformer/router/router.go +++ b/operator/builtin/transformer/router/router.go @@ -95,6 +95,9 @@ func (c RouterOperatorConfig) Build(bc operator.BuildContext) ([]operator.Operat return []operator.Operator{routerOperator}, nil } +// BuildsMultipleOps returns false +func (c RouterOperatorConfig) BuildsMultipleOps() bool { return false } + // RouterOperator is an operator that routes entries based on matching expressions type RouterOperator struct { helper.BasicOperator @@ -157,6 +160,15 @@ func (p *RouterOperator) Outputs() []operator.Operator { return outputs } +// GetOutputIDs will return all connected operators. +func (p *RouterOperator) GetOutputIDs() []string { + outputs := make([]string, 0, len(p.routes)) + for _, route := range p.routes { + outputs = append(outputs, route.OutputIDs...) + } + return outputs +} + // SetOutputs will set the outputs of the router operator. func (p *RouterOperator) SetOutputs(operators []operator.Operator) error { for _, route := range p.routes { @@ -170,6 +182,9 @@ func (p *RouterOperator) SetOutputs(operators []operator.Operator) error { return nil } +// SetOutputIDs will do nothing. +func (p *RouterOperator) SetOutputIDs(opIDs []string) {} + // findOperators will find a subset of operators from a collection. func (p *RouterOperator) findOperators(operators []operator.Operator, operatorIDs []string) ([]operator.Operator, error) { result := make([]operator.Operator, 0) diff --git a/operator/config.go b/operator/config.go index c48816b3..8ffb70cd 100644 --- a/operator/config.go +++ b/operator/config.go @@ -31,6 +31,7 @@ type Builder interface { ID() string Type() string Build(BuildContext) ([]Operator, error) + BuildsMultipleOps() bool } // UnmarshalJSON will unmarshal a config from JSON. diff --git a/operator/config_test.go b/operator/config_test.go index 0c2260cd..3fc0dd0f 100644 --- a/operator/config_test.go +++ b/operator/config_test.go @@ -31,6 +31,7 @@ type FakeBuilder struct { func (f *FakeBuilder) Build(context BuildContext) ([]Operator, error) { return nil, nil } func (f *FakeBuilder) ID() string { return "plugin" } func (f *FakeBuilder) Type() string { return "plugin" } +func (f *FakeBuilder) BuildsMultipleOps() bool { return false } func TestUnmarshalJSONErrors(t *testing.T) { t.Cleanup(func() { diff --git a/operator/helper/output.go b/operator/helper/output.go index 05976245..dafc20a5 100644 --- a/operator/helper/output.go +++ b/operator/helper/output.go @@ -45,6 +45,9 @@ func (c OutputConfig) Build(context operator.BuildContext) (OutputOperator, erro return outputOperator, nil } +// BuildsMultipleOps Returns false +func (c OutputConfig) BuildsMultipleOps() bool { return false } + // OutputOperator provides a basic implementation of an output operator. type OutputOperator struct { BasicOperator @@ -65,6 +68,11 @@ func (o *OutputOperator) Outputs() []operator.Operator { return []operator.Operator{} } +// GetOutputIDs will always return an empty array for an output ID. +func (o *OutputOperator) GetOutputIDs() []string { + return []string{} +} + // SetOutputs will return an error if called. func (o *OutputOperator) SetOutputs(operators []operator.Operator) error { return errors.NewError( @@ -72,3 +80,7 @@ func (o *OutputOperator) SetOutputs(operators []operator.Operator) error { "This is an unexpected internal error. Please submit a bug/issue.", ) } + +// SetOutputIDs will return nothing and does nothing. +func (o *OutputOperator) SetOutputIDs(opIDs []string) { +} diff --git a/operator/helper/writer.go b/operator/helper/writer.go index c2f42baa..4157ce92 100644 --- a/operator/helper/writer.go +++ b/operator/helper/writer.go @@ -45,9 +45,6 @@ func (c WriterConfig) Build(bc operator.BuildContext) (WriterOperator, error) { // Namespace all the output IDs namespacedIDs := c.OutputIDs.WithNamespace(bc) - if len(namespacedIDs) == 0 { - namespacedIDs = bc.DefaultOutputIDs - } writer := WriterOperator{ OutputIDs: namespacedIDs, @@ -56,6 +53,11 @@ func (c WriterConfig) Build(bc operator.BuildContext) (WriterOperator, error) { return writer, nil } +// BuildsMultipleOps Returns false as a base line +func (c WriterConfig) BuildsMultipleOps() bool { + return false +} + // WriterOperator is an operator that can write to other operators. type WriterOperator struct { BasicOperator @@ -84,6 +86,11 @@ func (w *WriterOperator) Outputs() []operator.Operator { return w.OutputOperators } +// GetOutputIDs returns the output IDs of the writer operator. +func (w *WriterOperator) GetOutputIDs() []string { + return w.OutputIDs +} + // SetOutputs will set the outputs of the operator. func (w *WriterOperator) SetOutputs(operators []operator.Operator) error { outputOperators := make([]operator.Operator, 0) @@ -105,6 +112,11 @@ func (w *WriterOperator) SetOutputs(operators []operator.Operator) error { return nil } +// SetOutputIDs will set the outputs of the operator. +func (w *WriterOperator) SetOutputIDs(opIds []string) { + w.OutputIDs = opIds +} + // FindOperator will find an operator matching the supplied id. func (w *WriterOperator) findOperator(operators []operator.Operator, operatorID string) (operator.Operator, bool) { for _, operator := range operators { diff --git a/operator/operator.go b/operator/operator.go index d4e28469..ba7fecfc 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -40,8 +40,12 @@ type Operator interface { CanOutput() bool // Outputs returns the list of connected outputs. Outputs() []Operator + // GetOutputIDs returns the list of connected outputs. + GetOutputIDs() []string // SetOutputs will set the connected outputs. SetOutputs([]Operator) error + // SetOutputIDs will set the connected outputs' IDs. + SetOutputIDs([]string) // CanProcess indicates if the operator will process entries from other operators. CanProcess() bool diff --git a/pipeline/config.go b/pipeline/config.go index cf43ef02..856897d3 100644 --- a/pipeline/config.go +++ b/pipeline/config.go @@ -21,17 +21,25 @@ import ( // Config is the configuration of a pipeline. type Config []operator.Config -// BuildOperators builds the operators from the list of configs into operators +// BuildOperators builds the operators from the list of configs into operators. func (c Config) BuildOperators(bc operator.BuildContext) ([]operator.Operator, error) { + // buildsMulti's key represents an operator's ID that builds multiple operators, e.g. Plugins. + // the value is the plugin's first operator's ID. + buildsMulti := make(map[string]string) operators := make([]operator.Operator, 0, len(c)) - for i, builder := range c { - nbc := getBuildContextWithDefaultOutput(c, i, bc) - op, err := builder.Build(nbc) + for _, builder := range c { + op, err := builder.Build(bc) if err != nil { return nil, err } + + if builder.BuildsMultipleOps() { + buildsMulti[bc.PrependNamespace(builder.ID())] = op[0].ID() + } operators = append(operators, op...) } + SetOutputIDs(operators, buildsMulti) + return operators, nil } @@ -53,12 +61,35 @@ func (c Config) BuildPipeline(bc operator.BuildContext, defaultOperator operator return NewDirectedPipeline(operators) } -func getBuildContextWithDefaultOutput(configs []operator.Config, i int, bc operator.BuildContext) operator.BuildContext { - if i+1 >= len(configs) { - return bc - } +// SetOutputIDs Loops through all the operators and sets a default output to the next operator in the slice. +// Additionally, if the output is set to a plugin, it sets the output to the first operator in the plugins pipeline. +func SetOutputIDs(operators []operator.Operator, buildsMulti map[string]string) error { + for i, op := range operators { + // because no output is specified at this point for the last operator, + // it will always be empty and there is nothing after it to automatically point towards, so we break the loop + if i+1 == len(operators) { + break + } - id := configs[i+1].ID() - id = bc.PrependNamespace(id) - return bc.WithDefaultOutputIDs([]string{id}) + if len(op.GetOutputIDs()) == 0 { + op.SetOutputIDs([]string{operators[i+1].ID()}) + continue + } + + // Check if there are any plugins within the outputIDs of the operator. If there are, change the output to be the first op in the plugin + allOutputs := []string{} + pluginFound := false + for _, id := range op.GetOutputIDs() { + if pid, ok := buildsMulti[id]; ok { + id = pid + pluginFound = true + } + allOutputs = append(allOutputs, id) + } + + if pluginFound { + op.SetOutputIDs(allOutputs) + } + } + return nil } diff --git a/plugin/config.go b/plugin/config.go index bd6728c9..d3c4751a 100644 --- a/plugin/config.go +++ b/plugin/config.go @@ -60,6 +60,8 @@ func (c *Config) Build(bc operator.BuildContext) ([]operator.Operator, error) { return pipelineConfig.Pipeline.BuildOperators(nbc) } +func (c *Config) BuildsMultipleOps() bool { return true } + func (c *Config) getRenderParams(bc operator.BuildContext) map[string]interface{} { // Copy the parameters to avoid mutating them params := map[string]interface{}{} diff --git a/plugin/config_test.go b/plugin/config_test.go index 58ed601c..9a10e010 100644 --- a/plugin/config_test.go +++ b/plugin/config_test.go @@ -116,6 +116,7 @@ pipeline: type: noop - id: noop1 type: noop + output: {{ .output }} `) pluginName := "my_plugin" pluginVar, err := NewPlugin(pluginName, pluginContent) @@ -238,6 +239,334 @@ pipeline: } } +type PluginOutputIDTestCase struct { + Name string + PluginConfig pipeline.Config + ExpectedOpIDs map[string][]string +} + +func TestPluginOutputIDs(t *testing.T) { + // TODO: ids shouldn't need to be specified once autogen IDs are implemented + pluginContent := []byte(` +parameters: +pipeline: + - type: noop + - id: noop1 + type: noop + output: {{ .output }} +`) + pluginName := "my_plugin" + pluginVar, err := NewPlugin(pluginName, pluginContent) + require.NoError(t, err) + operator.RegisterPlugin(pluginVar.ID, pluginVar.NewBuilder) + + // TODO: remove ID assignment + pluginContent2 := []byte(` +parameters: +pipeline: + - type: noop + - id: noop1 + type: noop + output: {{ .output }} +`) + secondPlugin := "secondPlugin" + secondPluginVar, err := NewPlugin(secondPlugin, pluginContent2) + require.NoError(t, err) + operator.RegisterPlugin(secondPluginVar.ID, secondPluginVar.NewBuilder) + + pluginContent3 := []byte(` +parameters: +pipeline: + - type: my_plugin + - type: noop + output: {{ .output }} +`) + layeredPlugin := "layeredPlugin" + layeredPluginVar, err := NewPlugin(layeredPlugin, pluginContent3) + require.NoError(t, err) + operator.RegisterPlugin(layeredPluginVar.ID, layeredPluginVar.NewBuilder) + + cases := []PluginOutputIDTestCase{ + { + Name: "same_op_outside_plugin", + PluginConfig: func() []operator.Config { + return pipeline.Config{ + operator.Config{ + Builder: noop.NewNoopOperatorConfig("noop"), + }, + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: pluginName, + OperatorType: pluginName, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: pluginVar, + }, + }, + operator.Config{ + // TODO: ID should be noop to start then auto gened to noop1 + Builder: noop.NewNoopOperatorConfig("noop1"), + }, + } + }(), + ExpectedOpIDs: map[string][]string{ + "$.noop": {"$." + pluginName + ".noop"}, + "$." + pluginName + ".noop": {"$." + pluginName + ".noop1"}, + "$." + pluginName + ".noop1": {"$.noop1"}, + }, + }, + { + Name: "two_plugins_with_same_ops", + PluginConfig: func() []operator.Config { + return pipeline.Config{ + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: pluginName, + OperatorType: pluginName, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: pluginVar, + }, + }, + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: secondPlugin, + OperatorType: secondPlugin, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: secondPluginVar, + }, + }, + } + }(), + ExpectedOpIDs: map[string][]string{ + "$." + pluginName + ".noop": {"$." + pluginName + ".noop1"}, + "$." + pluginName + ".noop1": {"$." + secondPlugin + ".noop"}, + "$." + secondPlugin + ".noop": {"$." + secondPlugin + ".noop1"}, + }, + }, + { + Name: "two_plugins_specified_output", + PluginConfig: func() []operator.Config { + return pipeline.Config{ + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: pluginName, + OperatorType: pluginName, + }, + OutputIDs: []string{"noop"}, + }, + Parameters: map[string]interface{}{}, + Plugin: pluginVar, + }, + }, + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: secondPlugin, + OperatorType: secondPlugin, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: secondPluginVar, + }, + }, + operator.Config{ + Builder: noop.NewNoopOperatorConfig("noop"), + }, + } + }(), + ExpectedOpIDs: map[string][]string{ + "$." + pluginName + ".noop": {"$." + pluginName + ".noop1"}, + "$." + pluginName + ".noop1": {"$.noop"}, + "$." + secondPlugin + ".noop": {"$." + secondPlugin + ".noop1"}, + "$." + secondPlugin + ".noop1": {"$.noop"}, + }, + }, + { + Name: "two_plugins_output_to_non_sequential_plugin", + PluginConfig: func() []operator.Config { + return pipeline.Config{ + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: pluginName, + OperatorType: pluginName, + }, + OutputIDs: []string{secondPlugin}, + }, + Parameters: map[string]interface{}{}, + Plugin: pluginVar, + }, + }, + operator.Config{ + Builder: noop.NewNoopOperatorConfig("noop"), + }, + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: secondPlugin, + OperatorType: secondPlugin, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: secondPluginVar, + }, + }, + } + }(), + ExpectedOpIDs: map[string][]string{ + "$." + pluginName + ".noop": {"$." + pluginName + ".noop1"}, + "$." + pluginName + ".noop1": {"$." + secondPlugin + ".noop"}, + "$.noop": {"$." + secondPlugin + ".noop"}, + "$." + secondPlugin + ".noop": {"$." + secondPlugin + ".noop1"}, + }, + }, + { + Name: "two_plugins_with_multiple_outside_ops", + PluginConfig: func() []operator.Config { + return pipeline.Config{ + operator.Config{ + Builder: noop.NewNoopOperatorConfig("noop"), + }, + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: pluginName, + OperatorType: pluginName, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: pluginVar, + }, + }, + operator.Config{ + // TODO: ID should be noop to start then auto gened to noop1 + Builder: noop.NewNoopOperatorConfig("noop1"), + }, + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: secondPlugin, + OperatorType: secondPlugin, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: secondPluginVar, + }, + }, + operator.Config{ + // TODO: ID should be noop to start then auto gened to noop2 + Builder: noop.NewNoopOperatorConfig("noop2"), + }, + } + }(), + ExpectedOpIDs: map[string][]string{ + "$.noop": {"$." + pluginName + ".noop"}, + "$." + pluginName + ".noop": {"$." + pluginName + ".noop1"}, + "$." + pluginName + ".noop1": {"$.noop1"}, + "$.noop1": {"$." + secondPlugin + ".noop"}, + "$." + secondPlugin + ".noop": {"$." + secondPlugin + ".noop1"}, + "$." + secondPlugin + ".noop1": {"$.noop2"}, + }, + }, + { + Name: "two_plugins_of_same_type", + PluginConfig: func() []operator.Config { + return pipeline.Config{ + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: pluginName, + OperatorType: pluginName, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: pluginVar, + }, + }, + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: pluginName + "1", + OperatorType: pluginName, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: secondPluginVar, + }, + }, + } + }(), + ExpectedOpIDs: map[string][]string{ + "$." + pluginName + ".noop": {"$." + pluginName + ".noop1"}, + "$." + pluginName + ".noop1": {"$." + pluginName + "1.noop"}, + "$." + pluginName + "1.noop": {"$." + pluginName + "1.noop1"}, + }, + }, + { + Name: "plugin_within_a_plugin", + PluginConfig: func() []operator.Config { + return pipeline.Config{ + operator.Config{ + Builder: &Config{ + WriterConfig: helper.WriterConfig{ + BasicConfig: helper.BasicConfig{ + OperatorID: layeredPlugin, + OperatorType: layeredPlugin, + }, + }, + Parameters: map[string]interface{}{}, + Plugin: layeredPluginVar, + }, + }, + operator.Config{ + Builder: noop.NewNoopOperatorConfig("noop"), + }, + } + }(), + ExpectedOpIDs: map[string][]string{ + "$.layeredPlugin." + pluginName + ".noop": {"$.layeredPlugin." + pluginName + ".noop1"}, + "$.layeredPlugin." + pluginName + ".noop1": {"$.layeredPlugin.noop"}, + "$.layeredPlugin.noop": {"$.noop"}, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + ops, err := tc.PluginConfig.BuildOperators(testutil.NewBuildContext(t)) + require.NoError(t, err) + + for i, op := range ops { + if i+1 < len(ops) { + out := op.GetOutputIDs() + t.Log("ID:" + op.ID()) + require.Equal(t, tc.ExpectedOpIDs[op.ID()], out) + } + } + }) + } +} + func TestBuildRecursiveFails(t *testing.T) { pluginConfig1 := []byte(` pipeline: diff --git a/testutil/mocks.go b/testutil/mocks.go index 65af6835..8698167b 100644 --- a/testutil/mocks.go +++ b/testutil/mocks.go @@ -65,9 +65,15 @@ func (f *FakeOutput) Logger() *zap.SugaredLogger { return f.SugaredLogger } // Outputs always returns nil for a fake output func (f *FakeOutput) Outputs() []operator.Operator { return nil } +// Outputs always returns nil for a fake output +func (f *FakeOutput) GetOutputIDs() []string { return nil } + // SetOutputs immediately returns nil for a fake output func (f *FakeOutput) SetOutputs(outputs []operator.Operator) error { return nil } +// SetOutputIDs immediately returns nil for a fake output +func (f *FakeOutput) SetOutputIDs(s []string) {} + // Start immediately returns nil for a fake output func (f *FakeOutput) Start(_ operator.Persister) error { return nil } diff --git a/testutil/operator.go b/testutil/operator.go index 20ad941b..a255be13 100644 --- a/testutil/operator.go +++ b/testutil/operator.go @@ -46,6 +46,22 @@ func (_m *Operator) CanProcess() bool { return r0 } +// GetOutputIDs provides a mock function with given fields: +func (_m *Operator) GetOutputIDs() []string { + ret := _m.Called() + + var r0 []string + if rf, ok := ret.Get(0).(func() []string); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + return r0 +} + // ID provides a mock function with given fields: func (_m *Operator) ID() string { ret := _m.Called() @@ -106,6 +122,11 @@ func (_m *Operator) Process(_a0 context.Context, _a1 *entry.Entry) error { return r0 } +// SetOutputIDs provides a mock function with given fields: _a0 +func (_m *Operator) SetOutputIDs(_a0 []string) { + _m.Called(_a0) +} + // SetOutputs provides a mock function with given fields: _a0 func (_m *Operator) SetOutputs(_a0 []operator.Operator) error { ret := _m.Called(_a0) diff --git a/testutil/operator_builder.go b/testutil/operator_builder.go index 653bd854..b14fe035 100644 --- a/testutil/operator_builder.go +++ b/testutil/operator_builder.go @@ -35,6 +35,20 @@ func (_m *OperatorBuilder) Build(_a0 operator.BuildContext) ([]operator.Operator return r0, r1 } +// BuildsMultipleOps provides a mock function with given fields: +func (_m *OperatorBuilder) BuildsMultipleOps() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + // ID provides a mock function with given fields: func (_m *OperatorBuilder) ID() string { ret := _m.Called()