Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
jsirianni committed May 24, 2021
2 parents f947e6c + 32e97be commit 2a0910e
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 10 deletions.
10 changes: 10 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: 2
updates:
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "monthly"
- package-ecosystem: "gomod"
directory: "/internal/tools"
schedule:
interval: "monthly"
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.14.2] - 2021-05-24

### Changed
- Make buffer max chunk delay reconfigurable on the fly [PR313](https://github.com/observIQ/stanza/pull/313)

## [0.14.1] - 2021-05-20

### Added
- Added optional network metadata labels to tcp / udp operators [PR302](https://github.com/observIQ/stanza/pull/302)
- Added AWS Cloudwatch Logs input operator [PR289](https://github.com/observIQ/stanza/pull/289)

## [0.14.0] - 2021-05-07

### Added
Expand Down
1 change: 1 addition & 0 deletions docs/operators/tcp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op
| `write_to` | $ | The record [field](/docs/types/field.md) written to when creating a new log entry |
| `labels` | {} | A map of `key: value` labels to add to the entry's labels |
| `resource` | {} | A map of `key: value` labels to add to the entry's resource |
| `add_labels` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` labels |

#### TLS Configuration

Expand Down
1 change: 1 addition & 0 deletions docs/operators/udp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The `udp_input` operator listens for logs from UDP packets.
| `write_to` | $ | The record [field](/docs/types/field.md) written to when creating a new log entry |
| `labels` | {} | A map of `key: value` labels to add to the entry's labels |
| `resource` | {} | A map of `key: value` labels to add to the entry's resource |
| `add_labels` | false | Adds `net.transport`, `net.peer.ip`, `net.peer.port`, `net.host.ip` and `net.host.port` labels |

### Example Configurations

Expand Down
5 changes: 5 additions & 0 deletions operator/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
Expand All @@ -16,6 +17,10 @@ type Buffer interface {
ReadWait(context.Context, []*entry.Entry) (Clearer, int, error)
ReadChunk(context.Context) ([]*entry.Entry, Clearer, error)
Close() error
MaxChunkDelay() time.Duration
MaxChunkSize() uint
SetMaxChunkDelay(time.Duration)
SetMaxChunkSize(uint)
}

// Config is a struct that wraps a Builder
Expand Down
30 changes: 28 additions & 2 deletions operator/buffer/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type DiskBuffer struct {

maxChunkDelay time.Duration
maxChunkSize uint

reconfigMutex sync.RWMutex
}

// NewDiskBuffer creates a new DiskBuffer
Expand Down Expand Up @@ -242,15 +244,15 @@ LOOP:

// ReadChunk is a thin wrapper around ReadWait that simplifies the call at the expense of an extra allocation
func (d *DiskBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, error) {
entries := make([]*entry.Entry, d.maxChunkSize)
entries := make([]*entry.Entry, d.MaxChunkSize())
for {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}

ctx, cancel := context.WithTimeout(ctx, d.maxChunkDelay)
ctx, cancel := context.WithTimeout(ctx, d.MaxChunkDelay())
defer cancel()
flushFunc, n, err := d.ReadWait(ctx, entries)
if n > 0 {
Expand Down Expand Up @@ -313,6 +315,30 @@ func (d *DiskBuffer) Read(dst []*entry.Entry) (f Clearer, i int, err error) {
return d.newClearer(newRead), readCount, nil
}

func (d *DiskBuffer) MaxChunkSize() uint {
d.reconfigMutex.RLock()
defer d.reconfigMutex.RUnlock()
return d.maxChunkSize
}

func (d *DiskBuffer) MaxChunkDelay() time.Duration {
d.reconfigMutex.RLock()
defer d.reconfigMutex.RUnlock()
return d.maxChunkDelay
}

func (d *DiskBuffer) SetMaxChunkSize(size uint) {
d.reconfigMutex.Lock()
d.maxChunkSize = size
d.reconfigMutex.Unlock()
}

func (d *DiskBuffer) SetMaxChunkDelay(delay time.Duration) {
d.reconfigMutex.Lock()
d.maxChunkDelay = delay
d.reconfigMutex.Unlock()
}

// newFlushFunc returns a function that marks read entries as flushed
func (d *DiskBuffer) newClearer(newRead []*readEntry) Clearer {
return &diskClearer{
Expand Down
29 changes: 27 additions & 2 deletions operator/buffer/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type MemoryBuffer struct {
sem *semaphore.Weighted
maxChunkDelay time.Duration
maxChunkSize uint
reconfigMutex sync.RWMutex
}

// Add inserts an entry into the memory database, blocking until there is space
Expand Down Expand Up @@ -105,15 +106,15 @@ func (m *MemoryBuffer) Read(dst []*entry.Entry) (Clearer, int, error) {

// ReadChunk is a thin wrapper around ReadWait that simplifies the call at the expense of an extra allocation
func (m *MemoryBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, error) {
entries := make([]*entry.Entry, m.maxChunkSize)
entries := make([]*entry.Entry, m.MaxChunkSize())
for {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}

ctx, cancel := context.WithTimeout(ctx, m.maxChunkDelay)
ctx, cancel := context.WithTimeout(ctx, m.MaxChunkDelay())
defer cancel()
flushFunc, n, err := m.ReadWait(ctx, entries)
if n > 0 {
Expand Down Expand Up @@ -145,6 +146,30 @@ func (m *MemoryBuffer) ReadWait(ctx context.Context, dst []*entry.Entry) (Cleare
return m.newClearer(inFlightIDs[:i]), i, nil
}

func (m *MemoryBuffer) MaxChunkSize() uint {
m.reconfigMutex.RLock()
defer m.reconfigMutex.RUnlock()
return m.maxChunkSize
}

func (m *MemoryBuffer) MaxChunkDelay() time.Duration {
m.reconfigMutex.RLock()
defer m.reconfigMutex.RUnlock()
return m.maxChunkDelay
}

func (m *MemoryBuffer) SetMaxChunkSize(size uint) {
m.reconfigMutex.Lock()
m.maxChunkSize = size
m.reconfigMutex.Unlock()
}

func (m *MemoryBuffer) SetMaxChunkDelay(delay time.Duration) {
m.reconfigMutex.Lock()
m.maxChunkDelay = delay
m.reconfigMutex.Unlock()
}

type memoryClearer struct {
buffer *MemoryBuffer
ids []uint64
Expand Down
18 changes: 18 additions & 0 deletions operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"crypto/tls"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -45,6 +46,7 @@ type TCPInputConfig struct {
MaxBufferSize helper.ByteSize `json:"max_buffer_size,omitempty" yaml:"max_buffer_size,omitempty"`
ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"`
TLS TLSConfig `json:"tls,omitempty" yaml:"tls,omitempty"`
AddLabels bool `json:"add_labels,omitempty" yaml:"add_labels,omitempty"`
}

// TLSConfig is the configuration for a TLS listener
Expand Down Expand Up @@ -106,6 +108,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
InputOperator: inputOperator,
address: c.ListenAddress,
maxBufferSize: int(c.MaxBufferSize),
addLabels: c.AddLabels,
tlsEnable: c.TLS.Enable,
tlsKeyPair: cert,
backoff: backoff.Backoff{
Expand All @@ -123,6 +126,7 @@ type TCPInput struct {
helper.InputOperator
address string
maxBufferSize int
addLabels bool
tlsEnable bool
tlsKeyPair tls.Certificate
backoff backoff.Backoff
Expand Down Expand Up @@ -228,6 +232,20 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c
t.Errorw("Failed to create entry", zap.Error(err))
continue
}

if t.addLabels {
entry.AddLabel("net.transport", "IP.TCP")
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
entry.AddLabel("net.peer.ip", addr.IP.String())
entry.AddLabel("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
}

if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
entry.AddLabel("net.host.ip", addr.IP.String())
entry.AddLabel("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
}
}

t.Write(ctx, entry)
}
if err := scanner.Err(); err != nil {
Expand Down
66 changes: 66 additions & 0 deletions operator/builtin/input/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"net"
"os"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -113,6 +114,66 @@ func tcpInputTest(input []byte, expected []string) func(t *testing.T) {
}
}

func tcpInputLabelsTest(input []byte, expected []string) func(t *testing.T) {
return func(t *testing.T) {
cfg := NewTCPInputConfig("test_id")
cfg.ListenAddress = ":0"
cfg.AddLabels = true

ops, err := cfg.Build(testutil.NewBuildContext(t))
require.NoError(t, err)
op := ops[0]

mockOutput := testutil.Operator{}
tcpInput := op.(*TCPInput)
tcpInput.InputOperator.OutputOperators = []operator.Operator{&mockOutput}

entryChan := make(chan *entry.Entry, 1)
mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
entryChan <- args.Get(1).(*entry.Entry)
}).Return(nil)

err = tcpInput.Start()
require.NoError(t, err)
defer tcpInput.Stop()

conn, err := net.Dial("tcp", tcpInput.listener.Addr().String())
require.NoError(t, err)
defer conn.Close()

_, err = conn.Write(input)
require.NoError(t, err)

for _, expectedMessage := range expected {
select {
case entry := <-entryChan:
expectedLabels := map[string]string{
"net.transport": "IP.TCP",
}
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
expectedLabels["net.host.ip"] = addr.IP.String()
expectedLabels["net.host.port"] = strconv.FormatInt(int64(addr.Port), 10)
}
if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
expectedLabels["net.peer.ip"] = addr.IP.String()
expectedLabels["net.peer.port"] = strconv.FormatInt(int64(addr.Port), 10)
}
require.Equal(t, expectedMessage, entry.Record)
require.Equal(t, expectedLabels, entry.Labels)
case <-time.After(time.Second):
require.FailNow(t, "Timed out waiting for message to be written")
}
}

select {
case entry := <-entryChan:
require.FailNow(t, "Unexpected entry: %s", entry)
case <-time.After(100 * time.Millisecond):
return
}
}
}

func tlsTCPInputTest(input []byte, expected []string) func(t *testing.T) {
return func(t *testing.T) {

Expand Down Expand Up @@ -280,6 +341,11 @@ func TestTcpInput(t *testing.T) {
t.Run("CarriageReturn", tcpInputTest([]byte("message\r\n"), []string{"message"}))
}

func TestTcpInputAattributes(t *testing.T) {
t.Run("Simple", tcpInputLabelsTest([]byte("message\n"), []string{"message"}))
t.Run("CarriageReturn", tcpInputLabelsTest([]byte("message\r\n"), []string{"message"}))
}

func TestTLSTcpInput(t *testing.T) {
t.Run("Simple", tlsTCPInputTest([]byte("message\n"), []string{"message"}))
t.Run("CarriageReturn", tlsTCPInputTest([]byte("message\r\n"), []string{"message"}))
Expand Down
Loading

0 comments on commit 2a0910e

Please sign in to comment.