From d786ba8eb3ccc22e6c43c71eb45fb5c7dd353782 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Fri, 21 May 2021 15:34:56 -0400 Subject: [PATCH 1/3] Try making buffer max chunk delay reconfigurable on the fly --- operator/buffer/buffer.go | 2 ++ operator/buffer/disk.go | 8 ++++++++ operator/buffer/memory.go | 7 +++++++ 3 files changed, 17 insertions(+) diff --git a/operator/buffer/buffer.go b/operator/buffer/buffer.go index 5dd631e0c..11f99e098 100644 --- a/operator/buffer/buffer.go +++ b/operator/buffer/buffer.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "time" "github.com/observiq/stanza/entry" "github.com/observiq/stanza/operator" @@ -16,6 +17,7 @@ type Buffer interface { ReadWait(context.Context, []*entry.Entry) (Clearer, int, error) ReadChunk(context.Context) ([]*entry.Entry, Clearer, error) Close() error + SetMaxChunkDelay(time.Duration) } // Config is a struct that wraps a Builder diff --git a/operator/buffer/disk.go b/operator/buffer/disk.go index a8c245fbc..559712a23 100644 --- a/operator/buffer/disk.go +++ b/operator/buffer/disk.go @@ -103,6 +103,8 @@ type DiskBuffer struct { maxChunkDelay time.Duration maxChunkSize uint + + reconfigMutex sync.Mutex } // NewDiskBuffer creates a new DiskBuffer @@ -313,6 +315,12 @@ func (d *DiskBuffer) Read(dst []*entry.Entry) (f Clearer, i int, err error) { return d.newClearer(newRead), readCount, nil } +func (d *DiskBuffer) SetMaxChunkDelay(delay time.Duration) { + d.reconfigMutex.Lock() + defer d.reconfigMutex.Unlock() + d.maxChunkDelay = delay +} + // newFlushFunc returns a function that marks read entries as flushed func (d *DiskBuffer) newClearer(newRead []*readEntry) Clearer { return &diskClearer{ diff --git a/operator/buffer/memory.go b/operator/buffer/memory.go index a2b7b8211..913c956c9 100644 --- a/operator/buffer/memory.go +++ b/operator/buffer/memory.go @@ -68,6 +68,7 @@ type MemoryBuffer struct { sem *semaphore.Weighted maxChunkDelay time.Duration maxChunkSize uint + reconfigMutex sync.Mutex } // Add inserts an entry into the memory database, blocking until there is space @@ -145,6 +146,12 @@ func (m *MemoryBuffer) ReadWait(ctx context.Context, dst []*entry.Entry) (Cleare return m.newClearer(inFlightIDs[:i]), i, nil } +func (m *MemoryBuffer) SetMaxChunkDelay(delay time.Duration) { + m.reconfigMutex.Lock() + defer m.reconfigMutex.Unlock() + m.maxChunkDelay = delay +} + type memoryClearer struct { buffer *MemoryBuffer ids []uint64 From ec0b0e2cd04544d5ff3cff9a8134cf569a6bcc9a Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Fri, 21 May 2021 16:06:25 -0400 Subject: [PATCH 2/3] Make buffer max_chunk_size configurable on the fly --- operator/buffer/buffer.go | 1 + operator/buffer/disk.go | 14 ++++++++++++-- operator/buffer/memory.go | 14 ++++++++++++-- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/operator/buffer/buffer.go b/operator/buffer/buffer.go index 11f99e098..cf3bfd15d 100644 --- a/operator/buffer/buffer.go +++ b/operator/buffer/buffer.go @@ -18,6 +18,7 @@ type Buffer interface { ReadChunk(context.Context) ([]*entry.Entry, Clearer, error) Close() error SetMaxChunkDelay(time.Duration) + SetMaxChunkSize(uint) } // Config is a struct that wraps a Builder diff --git a/operator/buffer/disk.go b/operator/buffer/disk.go index 559712a23..b2013f473 100644 --- a/operator/buffer/disk.go +++ b/operator/buffer/disk.go @@ -104,7 +104,7 @@ type DiskBuffer struct { maxChunkDelay time.Duration maxChunkSize uint - reconfigMutex sync.Mutex + reconfigMutex sync.RWMutex } // NewDiskBuffer creates a new DiskBuffer @@ -244,7 +244,9 @@ LOOP: // ReadChunk is a thin wrapper around ReadWait that simplifies the call at the expense of an extra allocation func (d *DiskBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, error) { + d.reconfigMutex.RLock() entries := make([]*entry.Entry, d.maxChunkSize) + d.reconfigMutex.RUnlock() for { select { case <-ctx.Done(): @@ -252,7 +254,9 @@ func (d *DiskBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, er default: } + d.reconfigMutex.RLock() ctx, cancel := context.WithTimeout(ctx, d.maxChunkDelay) + d.reconfigMutex.RUnlock() defer cancel() flushFunc, n, err := d.ReadWait(ctx, entries) if n > 0 { @@ -315,10 +319,16 @@ func (d *DiskBuffer) Read(dst []*entry.Entry) (f Clearer, i int, err error) { return d.newClearer(newRead), readCount, nil } +func (d *DiskBuffer) SetMaxChunkSize(size uint) { + d.reconfigMutex.Lock() + d.maxChunkSize = size + d.reconfigMutex.Unlock() +} + func (d *DiskBuffer) SetMaxChunkDelay(delay time.Duration) { d.reconfigMutex.Lock() - defer d.reconfigMutex.Unlock() d.maxChunkDelay = delay + d.reconfigMutex.Unlock() } // newFlushFunc returns a function that marks read entries as flushed diff --git a/operator/buffer/memory.go b/operator/buffer/memory.go index 913c956c9..36e2e1b82 100644 --- a/operator/buffer/memory.go +++ b/operator/buffer/memory.go @@ -68,7 +68,7 @@ type MemoryBuffer struct { sem *semaphore.Weighted maxChunkDelay time.Duration maxChunkSize uint - reconfigMutex sync.Mutex + reconfigMutex sync.RWMutex } // Add inserts an entry into the memory database, blocking until there is space @@ -106,7 +106,9 @@ func (m *MemoryBuffer) Read(dst []*entry.Entry) (Clearer, int, error) { // ReadChunk is a thin wrapper around ReadWait that simplifies the call at the expense of an extra allocation func (m *MemoryBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, error) { + m.reconfigMutex.RLock() entries := make([]*entry.Entry, m.maxChunkSize) + m.reconfigMutex.RUnlock() for { select { case <-ctx.Done(): @@ -114,7 +116,9 @@ func (m *MemoryBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, default: } + m.reconfigMutex.RLock() ctx, cancel := context.WithTimeout(ctx, m.maxChunkDelay) + m.reconfigMutex.RUnlock() defer cancel() flushFunc, n, err := m.ReadWait(ctx, entries) if n > 0 { @@ -146,10 +150,16 @@ func (m *MemoryBuffer) ReadWait(ctx context.Context, dst []*entry.Entry) (Cleare return m.newClearer(inFlightIDs[:i]), i, nil } +func (m *MemoryBuffer) SetMaxChunkSize(size uint) { + m.reconfigMutex.Lock() + m.maxChunkSize = size + m.reconfigMutex.Unlock() +} + func (m *MemoryBuffer) SetMaxChunkDelay(delay time.Duration) { m.reconfigMutex.Lock() - defer m.reconfigMutex.Unlock() m.maxChunkDelay = delay + m.reconfigMutex.Unlock() } type memoryClearer struct { From c3bf9142c3ab6f32818ecbecbbfaf261ea6a94ed Mon Sep 17 00:00:00 2001 From: Andy Keller Date: Mon, 24 May 2021 10:48:27 -0400 Subject: [PATCH 3/3] Make the MaxChunkDelay and MaxChunkSize available --- operator/buffer/buffer.go | 2 ++ operator/buffer/disk.go | 20 ++++++++++++++------ operator/buffer/memory.go | 20 ++++++++++++++------ 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/operator/buffer/buffer.go b/operator/buffer/buffer.go index cf3bfd15d..ee1326f98 100644 --- a/operator/buffer/buffer.go +++ b/operator/buffer/buffer.go @@ -17,6 +17,8 @@ type Buffer interface { ReadWait(context.Context, []*entry.Entry) (Clearer, int, error) ReadChunk(context.Context) ([]*entry.Entry, Clearer, error) Close() error + MaxChunkDelay() time.Duration + MaxChunkSize() uint SetMaxChunkDelay(time.Duration) SetMaxChunkSize(uint) } diff --git a/operator/buffer/disk.go b/operator/buffer/disk.go index b2013f473..94ea1e9d1 100644 --- a/operator/buffer/disk.go +++ b/operator/buffer/disk.go @@ -244,9 +244,7 @@ LOOP: // ReadChunk is a thin wrapper around ReadWait that simplifies the call at the expense of an extra allocation func (d *DiskBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, error) { - d.reconfigMutex.RLock() - entries := make([]*entry.Entry, d.maxChunkSize) - d.reconfigMutex.RUnlock() + entries := make([]*entry.Entry, d.MaxChunkSize()) for { select { case <-ctx.Done(): @@ -254,9 +252,7 @@ func (d *DiskBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, er default: } - d.reconfigMutex.RLock() - ctx, cancel := context.WithTimeout(ctx, d.maxChunkDelay) - d.reconfigMutex.RUnlock() + ctx, cancel := context.WithTimeout(ctx, d.MaxChunkDelay()) defer cancel() flushFunc, n, err := d.ReadWait(ctx, entries) if n > 0 { @@ -319,6 +315,18 @@ func (d *DiskBuffer) Read(dst []*entry.Entry) (f Clearer, i int, err error) { return d.newClearer(newRead), readCount, nil } +func (d *DiskBuffer) MaxChunkSize() uint { + d.reconfigMutex.RLock() + defer d.reconfigMutex.RUnlock() + return d.maxChunkSize +} + +func (d *DiskBuffer) MaxChunkDelay() time.Duration { + d.reconfigMutex.RLock() + defer d.reconfigMutex.RUnlock() + return d.maxChunkDelay +} + func (d *DiskBuffer) SetMaxChunkSize(size uint) { d.reconfigMutex.Lock() d.maxChunkSize = size diff --git a/operator/buffer/memory.go b/operator/buffer/memory.go index 36e2e1b82..3d0a24896 100644 --- a/operator/buffer/memory.go +++ b/operator/buffer/memory.go @@ -106,9 +106,7 @@ func (m *MemoryBuffer) Read(dst []*entry.Entry) (Clearer, int, error) { // ReadChunk is a thin wrapper around ReadWait that simplifies the call at the expense of an extra allocation func (m *MemoryBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, error) { - m.reconfigMutex.RLock() - entries := make([]*entry.Entry, m.maxChunkSize) - m.reconfigMutex.RUnlock() + entries := make([]*entry.Entry, m.MaxChunkSize()) for { select { case <-ctx.Done(): @@ -116,9 +114,7 @@ func (m *MemoryBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, default: } - m.reconfigMutex.RLock() - ctx, cancel := context.WithTimeout(ctx, m.maxChunkDelay) - m.reconfigMutex.RUnlock() + ctx, cancel := context.WithTimeout(ctx, m.MaxChunkDelay()) defer cancel() flushFunc, n, err := m.ReadWait(ctx, entries) if n > 0 { @@ -150,6 +146,18 @@ func (m *MemoryBuffer) ReadWait(ctx context.Context, dst []*entry.Entry) (Cleare return m.newClearer(inFlightIDs[:i]), i, nil } +func (m *MemoryBuffer) MaxChunkSize() uint { + m.reconfigMutex.RLock() + defer m.reconfigMutex.RUnlock() + return m.maxChunkSize +} + +func (m *MemoryBuffer) MaxChunkDelay() time.Duration { + m.reconfigMutex.RLock() + defer m.reconfigMutex.RUnlock() + return m.maxChunkDelay +} + func (m *MemoryBuffer) SetMaxChunkSize(size uint) { m.reconfigMutex.Lock() m.maxChunkSize = size