Skip to content

Commit

Permalink
Make the MaxChunkDelay and MaxChunkSize available
Browse files Browse the repository at this point in the history
  • Loading branch information
andykellr committed May 24, 2021
1 parent ec0b0e2 commit c3bf914
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 12 deletions.
2 changes: 2 additions & 0 deletions operator/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type Buffer interface {
ReadWait(context.Context, []*entry.Entry) (Clearer, int, error)
ReadChunk(context.Context) ([]*entry.Entry, Clearer, error)
Close() error
MaxChunkDelay() time.Duration
MaxChunkSize() uint
SetMaxChunkDelay(time.Duration)
SetMaxChunkSize(uint)
}
Expand Down
20 changes: 14 additions & 6 deletions operator/buffer/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,19 +244,15 @@ LOOP:

// ReadChunk is a thin wrapper around ReadWait that simplifies the call at the expense of an extra allocation
func (d *DiskBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, error) {
d.reconfigMutex.RLock()
entries := make([]*entry.Entry, d.maxChunkSize)
d.reconfigMutex.RUnlock()
entries := make([]*entry.Entry, d.MaxChunkSize())
for {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}

d.reconfigMutex.RLock()
ctx, cancel := context.WithTimeout(ctx, d.maxChunkDelay)
d.reconfigMutex.RUnlock()
ctx, cancel := context.WithTimeout(ctx, d.MaxChunkDelay())
defer cancel()
flushFunc, n, err := d.ReadWait(ctx, entries)
if n > 0 {
Expand Down Expand Up @@ -319,6 +315,18 @@ func (d *DiskBuffer) Read(dst []*entry.Entry) (f Clearer, i int, err error) {
return d.newClearer(newRead), readCount, nil
}

func (d *DiskBuffer) MaxChunkSize() uint {
d.reconfigMutex.RLock()
defer d.reconfigMutex.RUnlock()
return d.maxChunkSize
}

func (d *DiskBuffer) MaxChunkDelay() time.Duration {
d.reconfigMutex.RLock()
defer d.reconfigMutex.RUnlock()
return d.maxChunkDelay
}

func (d *DiskBuffer) SetMaxChunkSize(size uint) {
d.reconfigMutex.Lock()
d.maxChunkSize = size
Expand Down
20 changes: 14 additions & 6 deletions operator/buffer/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,15 @@ func (m *MemoryBuffer) Read(dst []*entry.Entry) (Clearer, int, error) {

// ReadChunk is a thin wrapper around ReadWait that simplifies the call at the expense of an extra allocation
func (m *MemoryBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, error) {
m.reconfigMutex.RLock()
entries := make([]*entry.Entry, m.maxChunkSize)
m.reconfigMutex.RUnlock()
entries := make([]*entry.Entry, m.MaxChunkSize())
for {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}

m.reconfigMutex.RLock()
ctx, cancel := context.WithTimeout(ctx, m.maxChunkDelay)
m.reconfigMutex.RUnlock()
ctx, cancel := context.WithTimeout(ctx, m.MaxChunkDelay())
defer cancel()
flushFunc, n, err := m.ReadWait(ctx, entries)
if n > 0 {
Expand Down Expand Up @@ -150,6 +146,18 @@ func (m *MemoryBuffer) ReadWait(ctx context.Context, dst []*entry.Entry) (Cleare
return m.newClearer(inFlightIDs[:i]), i, nil
}

func (m *MemoryBuffer) MaxChunkSize() uint {
m.reconfigMutex.RLock()
defer m.reconfigMutex.RUnlock()
return m.maxChunkSize
}

func (m *MemoryBuffer) MaxChunkDelay() time.Duration {
m.reconfigMutex.RLock()
defer m.reconfigMutex.RUnlock()
return m.maxChunkDelay
}

func (m *MemoryBuffer) SetMaxChunkSize(size uint) {
m.reconfigMutex.Lock()
m.maxChunkSize = size
Expand Down

0 comments on commit c3bf914

Please sign in to comment.