diff --git a/operator/buffer/buffer.go b/operator/buffer/buffer.go index 5dd631e0c..ee1326f98 100644 --- a/operator/buffer/buffer.go +++ b/operator/buffer/buffer.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "time" "github.com/observiq/stanza/entry" "github.com/observiq/stanza/operator" @@ -16,6 +17,10 @@ type Buffer interface { ReadWait(context.Context, []*entry.Entry) (Clearer, int, error) ReadChunk(context.Context) ([]*entry.Entry, Clearer, error) Close() error + MaxChunkDelay() time.Duration + MaxChunkSize() uint + SetMaxChunkDelay(time.Duration) + SetMaxChunkSize(uint) } // Config is a struct that wraps a Builder diff --git a/operator/buffer/disk.go b/operator/buffer/disk.go index a8c245fbc..94ea1e9d1 100644 --- a/operator/buffer/disk.go +++ b/operator/buffer/disk.go @@ -103,6 +103,8 @@ type DiskBuffer struct { maxChunkDelay time.Duration maxChunkSize uint + + reconfigMutex sync.RWMutex } // NewDiskBuffer creates a new DiskBuffer @@ -242,7 +244,7 @@ LOOP: // ReadChunk is a thin wrapper around ReadWait that simplifies the call at the expense of an extra allocation func (d *DiskBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, error) { - entries := make([]*entry.Entry, d.maxChunkSize) + entries := make([]*entry.Entry, d.MaxChunkSize()) for { select { case <-ctx.Done(): @@ -250,7 +252,7 @@ func (d *DiskBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, er default: } - ctx, cancel := context.WithTimeout(ctx, d.maxChunkDelay) + ctx, cancel := context.WithTimeout(ctx, d.MaxChunkDelay()) defer cancel() flushFunc, n, err := d.ReadWait(ctx, entries) if n > 0 { @@ -313,6 +315,30 @@ func (d *DiskBuffer) Read(dst []*entry.Entry) (f Clearer, i int, err error) { return d.newClearer(newRead), readCount, nil } +func (d *DiskBuffer) MaxChunkSize() uint { + d.reconfigMutex.RLock() + defer d.reconfigMutex.RUnlock() + return d.maxChunkSize +} + +func (d *DiskBuffer) MaxChunkDelay() time.Duration { + d.reconfigMutex.RLock() + defer d.reconfigMutex.RUnlock() + return d.maxChunkDelay +} + +func (d *DiskBuffer) SetMaxChunkSize(size uint) { + d.reconfigMutex.Lock() + d.maxChunkSize = size + d.reconfigMutex.Unlock() +} + +func (d *DiskBuffer) SetMaxChunkDelay(delay time.Duration) { + d.reconfigMutex.Lock() + d.maxChunkDelay = delay + d.reconfigMutex.Unlock() +} + // newFlushFunc returns a function that marks read entries as flushed func (d *DiskBuffer) newClearer(newRead []*readEntry) Clearer { return &diskClearer{ diff --git a/operator/buffer/memory.go b/operator/buffer/memory.go index a2b7b8211..3d0a24896 100644 --- a/operator/buffer/memory.go +++ b/operator/buffer/memory.go @@ -68,6 +68,7 @@ type MemoryBuffer struct { sem *semaphore.Weighted maxChunkDelay time.Duration maxChunkSize uint + reconfigMutex sync.RWMutex } // Add inserts an entry into the memory database, blocking until there is space @@ -105,7 +106,7 @@ func (m *MemoryBuffer) Read(dst []*entry.Entry) (Clearer, int, error) { // ReadChunk is a thin wrapper around ReadWait that simplifies the call at the expense of an extra allocation func (m *MemoryBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, error) { - entries := make([]*entry.Entry, m.maxChunkSize) + entries := make([]*entry.Entry, m.MaxChunkSize()) for { select { case <-ctx.Done(): @@ -113,7 +114,7 @@ func (m *MemoryBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, default: } - ctx, cancel := context.WithTimeout(ctx, m.maxChunkDelay) + ctx, cancel := context.WithTimeout(ctx, m.MaxChunkDelay()) defer cancel() flushFunc, n, err := m.ReadWait(ctx, entries) if n > 0 { @@ -145,6 +146,30 @@ func (m *MemoryBuffer) ReadWait(ctx context.Context, dst []*entry.Entry) (Cleare return m.newClearer(inFlightIDs[:i]), i, nil } +func (m *MemoryBuffer) MaxChunkSize() uint { + m.reconfigMutex.RLock() + defer m.reconfigMutex.RUnlock() + return m.maxChunkSize +} + +func (m *MemoryBuffer) MaxChunkDelay() time.Duration { + m.reconfigMutex.RLock() + defer m.reconfigMutex.RUnlock() + return m.maxChunkDelay +} + +func (m *MemoryBuffer) SetMaxChunkSize(size uint) { + m.reconfigMutex.Lock() + m.maxChunkSize = size + m.reconfigMutex.Unlock() +} + +func (m *MemoryBuffer) SetMaxChunkDelay(delay time.Duration) { + m.reconfigMutex.Lock() + m.maxChunkDelay = delay + m.reconfigMutex.Unlock() +} + type memoryClearer struct { buffer *MemoryBuffer ids []uint64