Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Counter Output Operator #570

Merged
merged 20 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/stanza/init_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
_ "github.com/observiq/stanza/operator/builtin/transformer/retain"
_ "github.com/observiq/stanza/operator/builtin/transformer/router"

_ "github.com/observiq/stanza/operator/builtin/output/count"
_ "github.com/observiq/stanza/operator/builtin/output/drop"
_ "github.com/observiq/stanza/operator/builtin/output/elastic"
_ "github.com/observiq/stanza/operator/builtin/output/file"
Expand Down
77 changes: 77 additions & 0 deletions docs/operators/count_output.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
## `count_output` operator

The `count_output` operator prints lines of encoded json to stdout or a file detailing the number of entries the output operator has gotten since stanza started running.

Count Information has this current JSON representation
```json
{
"entries": <number of entries this operator has received>,
"elapsedMinutes": <number of minutes stanza has been running since the start of this operator>,
"entries/minute": <number of entries per minute the output operator received>,
"timestamp": <current time that this message is being recorded formatted in RFC 3339>
}
```

### Configuration Fields

| Field | Default | Description |
| ---------- | -------------- | ---------------------------------------------------------------------------------------------------------------- |
| `id` | `count_output` | A unique identifier for the operator |
| `path` | | A file path to write the count information. If no path is provided then count information is outputted to stdout |
| `duration` | `1m` | The frequency of when to output the count information |

### Example Configurations

Configuration

```yaml
pipeline:
- type: generate_input
count: 500
Mrod1598 marked this conversation as resolved.
Show resolved Hide resolved
- type: count_output
```

#### Counting 500 generated lines printed to stdout:

`./stanza -c ./config.yaml`

```json
{"level":"info","timestamp":"2021-08-20T20:09:55.057-0400","message":"Starting stanza agent"}
{"level":"info","timestamp":"2021-08-20T20:09:55.057-0400","message":"Stanza agent started"}
{"entries":500,"elapsedMinutes":2,"entries/minute":250, "timestamp":"2021-08-20T20:09:55.057-0400"}
```

#### Configuration going to file:
```yaml
pipeline:
- type: generate_input
count: 500
- type: count_output
path: ./count.json
```

`./stanza -c ./config.yml`
> no output
```json
{"level":"info","timestamp":"2021-08-20T20:09:28.314-0400","message":"Starting stanza agent"}
{"level":"info","timestamp":"2021-08-20T20:09:28.314-0400","message":"Stanza agent started"}
```

Printing out results of specified file:
```sh
> cat count.json | jq
```
```json
{
"entries": 500,
"elapsedMinutes": 1,
"entries/minute": 500,
"timestamp": "2021-08-20T20:09:28.314-0400"
},
{
"entries": 500,
"elapsedMinutes": 2,
"entries/minute": 250,
"timestamp": "2021-08-20T20:09:29.414-0400"
}
```
158 changes: 158 additions & 0 deletions operator/builtin/output/count/count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package counter

import (
"context"
"encoding/json"
"fmt"
"math"
"os"
"sync"
"sync/atomic"
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
)

// CountOutputConfig is the configuration of a count output operator.
type CountOutputConfig struct {
cpheps marked this conversation as resolved.
Show resolved Hide resolved
helper.OutputConfig `yaml:",inline"`
Path string `json:"path,omitempty" yaml:"path,omitempty"`
Duration helper.Duration `json:"duration,omitempty" yaml:"duration,omitempty"`
}

var defaultCounterDuration = helper.NewDuration(1 * time.Minute)

func init() {
operator.Register("count_output", func() operator.Builder { return NewCounterOutputConfig("") })
}

// NewCounterOutputConfig creates the default config for the count_output operator.
func NewCounterOutputConfig(operatorID string) *CountOutputConfig {
return &CountOutputConfig{
OutputConfig: helper.NewOutputConfig(operatorID, "count_output"),
Duration: defaultCounterDuration,
}
}

// Build will build an instance of the count_output operator
func (c CountOutputConfig) Build(bc operator.BuildContext) ([]operator.Operator, error) {
outputOperator, err := c.OutputConfig.Build(bc)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
counterOperator := &CountOutput{
OutputOperator: outputOperator,
ctx: ctx,
cancel: cancel,
interval: c.Duration.Raw(),
path: c.Path,
numEntries: 0,
wg: sync.WaitGroup{},
}

return []operator.Operator{counterOperator}, nil
}

// CountOutput is an output operator
type CountOutput struct {
helper.OutputOperator
ctx context.Context
interval time.Duration
start time.Time
path string
file *os.File
encoder *json.Encoder
wg sync.WaitGroup
cancel context.CancelFunc

numEntries uint64
}

// Process increments the counter of the output operator
func (co *CountOutput) Process(_ context.Context, _ *entry.Entry) error {
atomic.AddUint64(&co.numEntries, 1)
return nil
}

// Start begins messaging count output to either stdout or a file
func (co *CountOutput) Start() error {
err := co.determineOutput()
if err != nil {
return err
}

co.start = time.Now()
co.wg.Add(1)
go co.startCounting()

return nil
}

// Stop tells the CountOutput to stop gracefully
func (co *CountOutput) Stop() error {
co.cancel()
co.wg.Wait()
if co.file != nil {
return co.file.Close()
}
return nil
}

func (co *CountOutput) startCounting() {
defer co.wg.Done()

ticker := time.NewTicker(co.interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
case <-co.ctx.Done():
return
}

err := co.logCount()
if err != nil {
return
}
}
}

type countObject struct {
Entries uint64 `json:"entries"`
ElapsedMinutes float64 `json:"elapsedMinutes"`
EntriesPerMinute float64 `json:"entries/minute"`
Timestamp string `json:"timestamp"`
}

func (co *CountOutput) logCount() error {
now := time.Now()
elapsedMinutes := now.Sub(co.start).Minutes()
entriesPerMinute := float64(atomic.LoadUint64(&co.numEntries)) / math.Max(elapsedMinutes, 1)
msg := &countObject{
Entries: co.numEntries,
schmikei marked this conversation as resolved.
Show resolved Hide resolved
ElapsedMinutes: elapsedMinutes,
EntriesPerMinute: entriesPerMinute,
Timestamp: now.Format(time.RFC3339),
}
return co.encoder.Encode(msg)
}

func (co *CountOutput) determineOutput() error {
if co.path == "" {
co.encoder = json.NewEncoder(os.Stdout)
return nil
}

file, err := os.OpenFile(co.path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
cpheps marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("unable to write counter info to file located at %s: %w", co.path, err)
}
co.file = file
co.encoder = json.NewEncoder(file)
return nil
}
129 changes: 129 additions & 0 deletions operator/builtin/output/count/count_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package counter

import (
"context"
"encoding/json"
"io/ioutil"
"os"
"testing"
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator/helper"
"github.com/observiq/stanza/testutil"
"github.com/stretchr/testify/require"
)

func TestBuildValid(t *testing.T) {
cfg := NewCounterOutputConfig("test")
ctx := testutil.NewBuildContext(t)
ops, err := cfg.Build(ctx)
require.NoError(t, err)
op := ops[0]
require.IsType(t, &CountOutput{}, op)
}

func TestBuildInvalid(t *testing.T) {
cfg := NewCounterOutputConfig("test")
ctx := testutil.NewBuildContext(t)
ctx.Logger = nil
_, err := cfg.Build(ctx)
require.Error(t, err)
require.Contains(t, err.Error(), "build context is missing a logger")
}

func TestFileCounterOutput(t *testing.T) {
cfg := NewCounterOutputConfig("test")

tmpFile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
cfg.Path = tmpFile.Name()
cfg.Duration = helper.NewDuration(1 * time.Second)

ctx := testutil.NewBuildContext(t)
ops, err := cfg.Build(ctx)
require.NoError(t, err)

counterOutput := ops[0].(*CountOutput)

err = counterOutput.Start()
require.NoError(t, err)
defer func() {
err := counterOutput.Stop()
require.NoError(t, err)
}()

e := entry.New()
err = counterOutput.Process(context.Background(), e)
require.NoError(t, err)
require.Equal(t, counterOutput.numEntries, uint64(1))

stat, err := os.Stat(tmpFile.Name())
require.NoError(t, err)

intialSize := stat.Size()

to, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-to.Done():
require.FailNow(t, "timed out waiting for file to be written to")
case <-ticker.C:
}
size, err := os.Stat(tmpFile.Name())
require.NoError(t, err)
if size.Size() != intialSize {
break
}
}

content, err := ioutil.ReadFile(tmpFile.Name())
require.NoError(t, err)

var object countObject
err = json.Unmarshal(content, &object)
require.NoError(t, err)

require.Equal(t, object.Entries, uint64(1))
require.GreaterOrEqual(t, object.EntriesPerMinute, 0.0)
require.GreaterOrEqual(t, object.ElapsedMinutes, 0.0)
}

func TestStartStdout(t *testing.T) {
cfg := NewCounterOutputConfig("test")

ctx := testutil.NewBuildContext(t)
ops, err := cfg.Build(ctx)
require.NoError(t, err)

counterOutput := ops[0].(*CountOutput)

err = counterOutput.Start()
defer func() {
err := counterOutput.Stop()
require.NoError(t, err)
}()
require.NoError(t, err)
}

func TestStartFailure(t *testing.T) {
cfg := NewCounterOutputConfig("test")
cfg.Path = "/a/path/to/a/nonexistent/file/hopefully"

ctx := testutil.NewBuildContext(t)
ops, err := cfg.Build(ctx)
require.NoError(t, err)

counterOutput := ops[0].(*CountOutput)

err = counterOutput.Start()
defer func() {
err := counterOutput.Stop()
require.NoError(t, err)
}()
require.Error(t, err)
require.Contains(t, err.Error(), "unable to write counter info to file")
}