Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make buffer max chunk delay reconfigurable on the fly #313

Merged
merged 3 commits into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions operator/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
Expand All @@ -16,6 +17,10 @@ type Buffer interface {
ReadWait(context.Context, []*entry.Entry) (Clearer, int, error)
ReadChunk(context.Context) ([]*entry.Entry, Clearer, error)
Close() error
MaxChunkDelay() time.Duration
MaxChunkSize() uint
SetMaxChunkDelay(time.Duration)
SetMaxChunkSize(uint)
}

// Config is a struct that wraps a Builder
Expand Down
30 changes: 28 additions & 2 deletions operator/buffer/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type DiskBuffer struct {

maxChunkDelay time.Duration
maxChunkSize uint

reconfigMutex sync.RWMutex
}

// NewDiskBuffer creates a new DiskBuffer
Expand Down Expand Up @@ -242,15 +244,15 @@ LOOP:

// ReadChunk is a thin wrapper around ReadWait that simplifies the call at the expense of an extra allocation
func (d *DiskBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, error) {
entries := make([]*entry.Entry, d.maxChunkSize)
entries := make([]*entry.Entry, d.MaxChunkSize())
for {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}

ctx, cancel := context.WithTimeout(ctx, d.maxChunkDelay)
ctx, cancel := context.WithTimeout(ctx, d.MaxChunkDelay())
defer cancel()
flushFunc, n, err := d.ReadWait(ctx, entries)
if n > 0 {
Expand Down Expand Up @@ -313,6 +315,30 @@ func (d *DiskBuffer) Read(dst []*entry.Entry) (f Clearer, i int, err error) {
return d.newClearer(newRead), readCount, nil
}

func (d *DiskBuffer) MaxChunkSize() uint {
d.reconfigMutex.RLock()
defer d.reconfigMutex.RUnlock()
return d.maxChunkSize
}

func (d *DiskBuffer) MaxChunkDelay() time.Duration {
d.reconfigMutex.RLock()
defer d.reconfigMutex.RUnlock()
return d.maxChunkDelay
}

func (d *DiskBuffer) SetMaxChunkSize(size uint) {
d.reconfigMutex.Lock()
d.maxChunkSize = size
d.reconfigMutex.Unlock()
}

func (d *DiskBuffer) SetMaxChunkDelay(delay time.Duration) {
d.reconfigMutex.Lock()
d.maxChunkDelay = delay
d.reconfigMutex.Unlock()
}

// newFlushFunc returns a function that marks read entries as flushed
func (d *DiskBuffer) newClearer(newRead []*readEntry) Clearer {
return &diskClearer{
Expand Down
29 changes: 27 additions & 2 deletions operator/buffer/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type MemoryBuffer struct {
sem *semaphore.Weighted
maxChunkDelay time.Duration
maxChunkSize uint
reconfigMutex sync.RWMutex
}

// Add inserts an entry into the memory database, blocking until there is space
Expand Down Expand Up @@ -105,15 +106,15 @@ func (m *MemoryBuffer) Read(dst []*entry.Entry) (Clearer, int, error) {

// ReadChunk is a thin wrapper around ReadWait that simplifies the call at the expense of an extra allocation
func (m *MemoryBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, error) {
entries := make([]*entry.Entry, m.maxChunkSize)
entries := make([]*entry.Entry, m.MaxChunkSize())
for {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}

ctx, cancel := context.WithTimeout(ctx, m.maxChunkDelay)
ctx, cancel := context.WithTimeout(ctx, m.MaxChunkDelay())
defer cancel()
flushFunc, n, err := m.ReadWait(ctx, entries)
if n > 0 {
Expand Down Expand Up @@ -145,6 +146,30 @@ func (m *MemoryBuffer) ReadWait(ctx context.Context, dst []*entry.Entry) (Cleare
return m.newClearer(inFlightIDs[:i]), i, nil
}

func (m *MemoryBuffer) MaxChunkSize() uint {
m.reconfigMutex.RLock()
defer m.reconfigMutex.RUnlock()
return m.maxChunkSize
}

func (m *MemoryBuffer) MaxChunkDelay() time.Duration {
m.reconfigMutex.RLock()
defer m.reconfigMutex.RUnlock()
return m.maxChunkDelay
}

func (m *MemoryBuffer) SetMaxChunkSize(size uint) {
m.reconfigMutex.Lock()
m.maxChunkSize = size
m.reconfigMutex.Unlock()
}

func (m *MemoryBuffer) SetMaxChunkDelay(delay time.Duration) {
m.reconfigMutex.Lock()
m.maxChunkDelay = delay
m.reconfigMutex.Unlock()
}

type memoryClearer struct {
buffer *MemoryBuffer
ids []uint64
Expand Down