Skip to content

Commit

Permalink
Move configuration values to AzureConfig type in azure helper package…
Browse files Browse the repository at this point in the history
…, in order to reuse the code between both operators. Input configuration is unchanged due to yaml:",inline"
  • Loading branch information
jsirianni committed May 5, 2021
1 parent 2594a30 commit 1270ee6
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 128 deletions.
46 changes: 46 additions & 0 deletions operator/builtin/input/azure/event_hub_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package azure

import (
"fmt"
)

// AzureConfig is the configuration of a Azure Event Hub input operator.
type AzureConfig struct {
// required
Namespace string `json:"namespace,omitempty" yaml:"namespace,omitempty"`
Name string `json:"name,omitempty" yaml:"name,omitempty"`
Group string `json:"group,omitempty" yaml:"group,omitempty"`
ConnectionString string `json:"connection_string,omitempty" yaml:"connection_string,omitempty"`

// optional
PrefetchCount uint32 `json:"prefetch_count,omitempty" yaml:"prefetch_count,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
}

func (a AzureConfig) Validate() error {
if a.Namespace == "" {
return fmt.Errorf("missing required parameter 'namespace'")
}

if a.Name == "" {
return fmt.Errorf("missing required parameter 'name'")
}

if a.Group == "" {
return fmt.Errorf("missing required parameter 'group'")
}

if a.ConnectionString == "" {
return fmt.Errorf("missing required parameter 'connection_string'")
}

if a.PrefetchCount < 1 {
return fmt.Errorf("invalid value for parameter 'prefetch_count'")
}

if a.StartAt != "beginning" && a.StartAt != "end" {
return fmt.Errorf("invalid value for parameter 'start_at'")
}

return nil
}
123 changes: 123 additions & 0 deletions operator/builtin/input/azure/event_hub_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package azure

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestValidate(t *testing.T) {
cases := []struct {
name string
input AzureConfig
expectErr bool
}{
{
"missing-namespace",
AzureConfig{
Namespace: "",
Name: "john",
Group: "devel",
ConnectionString: "some connection string",
StartAt: "end",
PrefetchCount: 10,
},
true,
},
{
"missing-name",
AzureConfig{
Namespace: "namespace",
Name: "",
Group: "devel",
ConnectionString: "some connection string",
StartAt: "end",
PrefetchCount: 10,
},
true,
},
{
"missing-group",
AzureConfig{
Namespace: "namespace",
Name: "dev",
Group: "",
ConnectionString: "some connection string",
StartAt: "end",
PrefetchCount: 10,
},
true,
},
{
"missing-connection-string",
AzureConfig{
Namespace: "namespace",
Name: "dev",
Group: "dev",
ConnectionString: "",
StartAt: "end",
PrefetchCount: 10,
},
true,
},
{
"invalid-prefetch-count",
AzureConfig{
Namespace: "namespace",
Name: "dev",
Group: "dev",
ConnectionString: "some string",
StartAt: "end",
PrefetchCount: 0,
},
true,
},
{
"invalid-start-at",
AzureConfig{
Namespace: "namespace",
Name: "dev",
Group: "dev",
ConnectionString: "some string",
StartAt: "bad",
PrefetchCount: 10,
},
true,
},
{
"valid-start-at-end",
AzureConfig{
Namespace: "namespace",
Name: "dev",
Group: "dev",
ConnectionString: "some string",
StartAt: "end",
PrefetchCount: 10,
},
false,
},
{
"valid-start-at-beginning",
AzureConfig{
Namespace: "namespace",
Name: "dev",
Group: "dev",
ConnectionString: "some string",
PrefetchCount: 10,
StartAt: "beginning",
},
false,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
err := tc.input.Validate()
if tc.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
39 changes: 8 additions & 31 deletions operator/builtin/input/azure/eventhub/event_hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,18 @@ func init() {
// NewEventHubConfig creates a new Azure Event Hub input config with default values
func NewEventHubConfig(operatorID string) *EventHubInputConfig {
return &EventHubInputConfig{
InputConfig: helper.NewInputConfig(operatorID, operatorName),
PrefetchCount: 1000,
StartAt: "end",
InputConfig: helper.NewInputConfig(operatorID, operatorName),
AzureConfig: azure.AzureConfig{
PrefetchCount: 1000,
StartAt: "end",
},
}
}

// EventHubInputConfig is the configuration of a Azure Event Hub input operator.
type EventHubInputConfig struct {
helper.InputConfig `yaml:",inline"`

// required
Namespace string `json:"namespace,omitempty" yaml:"namespace,omitempty"`
Name string `json:"name,omitempty" yaml:"name,omitempty"`
Group string `json:"group,omitempty" yaml:"group,omitempty"`
ConnectionString string `json:"connection_string,omitempty" yaml:"connection_string,omitempty"`

// optional
PrefetchCount uint32 `json:"prefetch_count,omitempty" yaml:"prefetch_count,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
azure.AzureConfig `yaml:",inline"`
}

// Build will build a Azure Event Hub input operator.
Expand All @@ -46,24 +39,8 @@ func (c *EventHubInputConfig) Build(buildContext operator.BuildContext) ([]opera
return nil, err
}

if c.Namespace == "" {
return nil, fmt.Errorf("missing required %s parameter 'namespace'", operatorName)
}

if c.Name == "" {
return nil, fmt.Errorf("missing required %s parameter 'name'", operatorName)
}

if c.Group == "" {
return nil, fmt.Errorf("missing required %s parameter 'group'", operatorName)
}

if c.ConnectionString == "" {
return nil, fmt.Errorf("missing required %s parameter 'connection_string'", operatorName)
}

if c.PrefetchCount < 1 {
return nil, fmt.Errorf("invalid value '%d' for %s parameter 'prefetch_count'", c.PrefetchCount, operatorName)
if err := c.AzureConfig.Validate(); err != nil {
return nil, err
}

var startAtBegining bool
Expand Down
79 changes: 46 additions & 33 deletions operator/builtin/input/azure/eventhub/event_hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eventhub
import (
"testing"

"github.com/observiq/stanza/operator/builtin/input/azure"
"github.com/observiq/stanza/testutil"
"github.com/stretchr/testify/require"
)
Expand All @@ -16,69 +17,81 @@ func TestBuild(t *testing.T) {
{
"default",
EventHubInputConfig{
Namespace: "test",
Name: "test",
Group: "test",
ConnectionString: "test",
PrefetchCount: 1000,
AzureConfig: azure.AzureConfig{
Namespace: "test",
Name: "test",
Group: "test",
ConnectionString: "test",
PrefetchCount: 1000,
},
},
false,
},
{
"prefetch",
EventHubInputConfig{
Namespace: "test",
Name: "test",
Group: "test",
ConnectionString: "test",
PrefetchCount: 100,
AzureConfig: azure.AzureConfig{
Namespace: "test",
Name: "test",
Group: "test",
ConnectionString: "test",
PrefetchCount: 100,
},
},
false,
},
{
"startat-end",
EventHubInputConfig{
Namespace: "test",
Name: "test",
Group: "test",
ConnectionString: "test",
StartAt: "end",
PrefetchCount: 1000,
AzureConfig: azure.AzureConfig{
Namespace: "test",
Name: "test",
Group: "test",
ConnectionString: "test",
StartAt: "end",
PrefetchCount: 1000,
},
},
false,
},
{
"startat-beginning",
EventHubInputConfig{
Namespace: "test",
Name: "test",
Group: "test",
ConnectionString: "test",
StartAt: "beginning",
PrefetchCount: 1000,
AzureConfig: azure.AzureConfig{
Namespace: "test",
Name: "test",
Group: "test",
ConnectionString: "test",
StartAt: "beginning",
PrefetchCount: 1000,
},
},
false,
},
{
"prefetch-invalid",
EventHubInputConfig{
Namespace: "test",
Name: "test",
Group: "test",
ConnectionString: "test",
PrefetchCount: 0,
AzureConfig: azure.AzureConfig{
Namespace: "test",
Name: "test",
Group: "test",
ConnectionString: "test",
PrefetchCount: 0,
},
},
true,
},
{
"default-required-startat-invalid",
EventHubInputConfig{
Namespace: "test",
Name: "test",
Group: "test",
ConnectionString: "test",
StartAt: "invalid",
PrefetchCount: 1000,
AzureConfig: azure.AzureConfig{
Namespace: "test",
Name: "test",
Group: "test",
ConnectionString: "test",
StartAt: "invalid",
PrefetchCount: 1000,
},
},
true,
},
Expand Down
Loading

0 comments on commit 1270ee6

Please sign in to comment.