Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
listener: Prevent large HTTP POSTs from being broken up
Browse files Browse the repository at this point in the history
This fixes an issue where a large HTTP POST, or a POST which arrives
when the batch buffer is nearly full could get split, corrupting one
of the lines contained within it.

The batch buffer will now be grown if required to support any HTTP
POST that arrives. Growing the batch buffer is expensive and the
listener is arranged to minimise the chance of the buffer needing to
grow at all. Growing the buffer is a fallback.
  • Loading branch information
mjs committed May 15, 2018
1 parent 04dd6f1 commit 4a53ca9
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 21 deletions.
42 changes: 41 additions & 1 deletion listener/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package listener

import "io"
import (
"io"
)

// newBatch returns a new batch buffer with the initial capacity
// specified.
Expand Down Expand Up @@ -45,6 +47,35 @@ func (b *batch) readOnceFrom(r io.Reader) (int, error) {
return n, err
}

// readFrom reads everything from an io.Reader, growing the batch if
// required.
func (b *batch) readFrom(r io.Reader) (int, error) {
total := 0
for {
// If there's not much capacity left, grow the buffer.
if b.remaining() <= 512 {
b.grow()
}
n, err := r.Read(b.buf[len(b.buf):cap(b.buf)])
if n > 0 {
b.buf = b.buf[:len(b.buf)+n]
total += n
}

if err != nil {
if err == io.EOF {
err = nil
}
return total, err
}
}
}

// size returns the number of bytes currently stored in the batch.
func (b *batch) size() int {
return len(b.buf)
}

// remaining returns the number of bytes still unused in the batch.
func (b *batch) remaining() int {
return cap(b.buf) - len(b.buf)
Expand All @@ -60,3 +91,12 @@ func (b *batch) reset() {
func (b *batch) bytes() []byte {
return b.buf
}

// grow doubles the size of the batch's internal buffer. This is
// expensive and should be avoided where possible.
func (b *batch) grow() {
newBuf := make([]byte, int(cap(b.buf)*2))
copy(newBuf, b.buf)
newBuf = newBuf[:len(b.buf)]
b.buf = newBuf
}
32 changes: 12 additions & 20 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package listener

import (
"fmt"
"io"
"log"
"net"
"net/http"
Expand Down Expand Up @@ -206,20 +205,15 @@ func (l *Listener) listenUDP(sc *net.UDPConn) {
func (l *Listener) setupHTTP() *http.Server {
mux := http.NewServeMux()
mux.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) {
for {
bytesRead, err := l.batch.readOnceFrom(r.Body)
if bytesRead > 0 {
if l.c.Debug {
log.Printf("HTTP listener read %d bytes", bytesRead)
}
l.processRead()
}
if err != nil {
if err != io.EOF {
l.stats.Inc(statReadErrors)
}
return
bytesRead, err := l.batch.readFrom(r.Body)
if err != nil {
l.stats.Inc(statReadErrors)
}
if bytesRead > 0 {
if l.c.Debug {
log.Printf("HTTP listener read %d bytes", bytesRead)
}
l.processRead()
}
})
return &http.Server{
Expand Down Expand Up @@ -251,12 +245,10 @@ func (l *Listener) processRead() {
// Send when the configured number of reads have been batched or
// the batch buffer is almost full.

// If the batch has less capacity left than the size of a maximum
// UDP datagram, then force a send to avoid growing the batch
// unnecessarily (allocations hurt performance). UDP datagrams of
// this size are practically unlikely but it's a nice number to
// use.
batchNearlyFull := l.batch.remaining() <= maxUDPDatagramSize
// If the batch size is within a (maximum) UDP datagram of the
// configured target batch size, then force a send to avoid
// growing the batch unnecessarily (allocations hurt performance).
batchNearlyFull := l.c.ListenerBatchBytes-l.batch.size() <= maxUDPDatagramSize

if statReceived%l.c.BatchMessages == 0 || batchNearlyFull {
l.stats.Inc(statSent)
Expand Down
33 changes: 33 additions & 0 deletions listener/listener_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,39 @@ func TestHTTPListener(t *testing.T) {
assertMonitor(t, monitorCh, numLines, numLines)
}

func TestHTTPListenerBigPOST(t *testing.T) {
conf := testConfig()
conf.ListenerBatchBytes = 1024
// Use a batch size > 1. Even though a single write will be made,
// the batch should still get sent because the buffer size limit
// is exceeded.
conf.BatchMessages = 10

listener, err := StartHTTPListener(conf)
require.NoError(t, err)
spouttest.AssertReadyProbe(t, conf.ProbePort)
defer listener.Stop()

listenerCh, unsubListener := subListener(t)
defer unsubListener()

monitorCh, unsubMonitor := subMonitor(t)
defer unsubMonitor()

// Send a post that's bigger than the configured batch size. This
// will force the batch buffer to grow.
buf := make([]byte, conf.ListenerBatchBytes+200)

url := fmt.Sprintf("http://localhost:%d/write", listenPort)
_, err = http.Post(url, "text/plain", bytes.NewBuffer(buf))
require.NoError(t, err)

assertBatch(t, listenerCh, string(buf))
assertNoMore(t, listenerCh)

assertMonitor(t, monitorCh, 1, 1)
}

func BenchmarkListenerLatency(b *testing.B) {
listener := startListener(b, testConfig())
defer listener.Stop()
Expand Down

0 comments on commit 4a53ca9

Please sign in to comment.