Skip to content

Commit

Permalink
Make buffer max chunk delay reconfigurable on the fly (#313)
Browse files Browse the repository at this point in the history
* Try making buffer max chunk delay reconfigurable on the fly

* Make buffer max_chunk_size configurable on the fly

* Make the MaxChunkDelay and MaxChunkSize available

Co-authored-by: Andy Keller <andykellr@users.noreply.github.com>
  • Loading branch information
djaglowski and andykellr authored May 24, 2021
1 parent cd2cf8b commit eda97c7
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 4 deletions.
5 changes: 5 additions & 0 deletions operator/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
Expand All @@ -16,6 +17,10 @@ type Buffer interface {
ReadWait(context.Context, []*entry.Entry) (Clearer, int, error)
ReadChunk(context.Context) ([]*entry.Entry, Clearer, error)
Close() error
MaxChunkDelay() time.Duration
MaxChunkSize() uint
SetMaxChunkDelay(time.Duration)
SetMaxChunkSize(uint)
}

// Config is a struct that wraps a Builder
Expand Down
30 changes: 28 additions & 2 deletions operator/buffer/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type DiskBuffer struct {

maxChunkDelay time.Duration
maxChunkSize uint

reconfigMutex sync.RWMutex
}

// NewDiskBuffer creates a new DiskBuffer
Expand Down Expand Up @@ -242,15 +244,15 @@ LOOP:

// ReadChunk is a thin wrapper around ReadWait that simplifies the call at the expense of an extra allocation
func (d *DiskBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, error) {
entries := make([]*entry.Entry, d.maxChunkSize)
entries := make([]*entry.Entry, d.MaxChunkSize())
for {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}

ctx, cancel := context.WithTimeout(ctx, d.maxChunkDelay)
ctx, cancel := context.WithTimeout(ctx, d.MaxChunkDelay())
defer cancel()
flushFunc, n, err := d.ReadWait(ctx, entries)
if n > 0 {
Expand Down Expand Up @@ -313,6 +315,30 @@ func (d *DiskBuffer) Read(dst []*entry.Entry) (f Clearer, i int, err error) {
return d.newClearer(newRead), readCount, nil
}

func (d *DiskBuffer) MaxChunkSize() uint {
d.reconfigMutex.RLock()
defer d.reconfigMutex.RUnlock()
return d.maxChunkSize
}

func (d *DiskBuffer) MaxChunkDelay() time.Duration {
d.reconfigMutex.RLock()
defer d.reconfigMutex.RUnlock()
return d.maxChunkDelay
}

func (d *DiskBuffer) SetMaxChunkSize(size uint) {
d.reconfigMutex.Lock()
d.maxChunkSize = size
d.reconfigMutex.Unlock()
}

func (d *DiskBuffer) SetMaxChunkDelay(delay time.Duration) {
d.reconfigMutex.Lock()
d.maxChunkDelay = delay
d.reconfigMutex.Unlock()
}

// newFlushFunc returns a function that marks read entries as flushed
func (d *DiskBuffer) newClearer(newRead []*readEntry) Clearer {
return &diskClearer{
Expand Down
29 changes: 27 additions & 2 deletions operator/buffer/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type MemoryBuffer struct {
sem *semaphore.Weighted
maxChunkDelay time.Duration
maxChunkSize uint
reconfigMutex sync.RWMutex
}

// Add inserts an entry into the memory database, blocking until there is space
Expand Down Expand Up @@ -105,15 +106,15 @@ func (m *MemoryBuffer) Read(dst []*entry.Entry) (Clearer, int, error) {

// ReadChunk is a thin wrapper around ReadWait that simplifies the call at the expense of an extra allocation
func (m *MemoryBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, error) {
entries := make([]*entry.Entry, m.maxChunkSize)
entries := make([]*entry.Entry, m.MaxChunkSize())
for {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}

ctx, cancel := context.WithTimeout(ctx, m.maxChunkDelay)
ctx, cancel := context.WithTimeout(ctx, m.MaxChunkDelay())
defer cancel()
flushFunc, n, err := m.ReadWait(ctx, entries)
if n > 0 {
Expand Down Expand Up @@ -145,6 +146,30 @@ func (m *MemoryBuffer) ReadWait(ctx context.Context, dst []*entry.Entry) (Cleare
return m.newClearer(inFlightIDs[:i]), i, nil
}

func (m *MemoryBuffer) MaxChunkSize() uint {
m.reconfigMutex.RLock()
defer m.reconfigMutex.RUnlock()
return m.maxChunkSize
}

func (m *MemoryBuffer) MaxChunkDelay() time.Duration {
m.reconfigMutex.RLock()
defer m.reconfigMutex.RUnlock()
return m.maxChunkDelay
}

func (m *MemoryBuffer) SetMaxChunkSize(size uint) {
m.reconfigMutex.Lock()
m.maxChunkSize = size
m.reconfigMutex.Unlock()
}

func (m *MemoryBuffer) SetMaxChunkDelay(delay time.Duration) {
m.reconfigMutex.Lock()
m.maxChunkDelay = delay
m.reconfigMutex.Unlock()
}

type memoryClearer struct {
buffer *MemoryBuffer
ids []uint64
Expand Down

0 comments on commit eda97c7

Please sign in to comment.