Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #77 from mjs/http-listener-concurrency
Browse files Browse the repository at this point in the history
listener: Protect batch buffer from concurrent access
  • Loading branch information
oplehto authored May 15, 2018
2 parents 259feb2 + cd7972a commit 693993a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
5 changes: 5 additions & 0 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type Listener struct {

wg sync.WaitGroup
stop chan struct{}
mu sync.Mutex // only used for HTTP listener
}

// Stop shuts down a running listener. It should be called exactly
Expand Down Expand Up @@ -205,15 +206,19 @@ func (l *Listener) listenUDP(sc *net.UDPConn) {
func (l *Listener) setupHTTP() *http.Server {
mux := http.NewServeMux()
mux.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) {
l.mu.Lock()
bytesRead, err := l.batch.readFrom(r.Body)
l.mu.Unlock()
if err != nil {
l.stats.Inc(statReadErrors)
}
if bytesRead > 0 {
if l.c.Debug {
log.Printf("HTTP listener read %d bytes", bytesRead)
}
l.mu.Lock()
l.processRead()
l.mu.Unlock()
}
})
return &http.Server{
Expand Down
56 changes: 56 additions & 0 deletions listener/listener_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,62 @@ func TestHTTPListenerBigPOST(t *testing.T) {
assertMonitor(t, monitorCh, 1, 1)
}

func TestHTTPListenerConcurrency(t *testing.T) {
conf := testConfig()
listener, err := StartHTTPListener(conf)
require.NoError(t, err)
spouttest.AssertReadyProbe(t, conf.ProbePort)
defer listener.Stop()

listenerCh, unsubListener := subListener(t)
defer unsubListener()

// Send the same line many times from multiple goroutines.
const senders = 10
const sendCount = 100
const totalLines = senders * sendCount
sendLine := fmt.Sprintf("cpu load=0.69 foo=bar %d\n", time.Now().UnixNano())

url := fmt.Sprintf("http://localhost:%d/write", listenPort)
errs := make(chan error, senders)
for sender := 0; sender < senders; sender++ {
go func() {
client := new(http.Client)
for i := 0; i < sendCount; i++ {
_, err := client.Post(url, "text/plain", bytes.NewBufferString(sendLine))
if err != nil {
errs <- err
}
}
errs <- nil
}()
}

// Wait for the senders to be done sending, and all the lines to
// be returned.
sendersDone := 0
received := 0
timeout := time.After(spouttest.LongWait)
for received < totalLines || sendersDone < senders {
select {
case lines := <-listenerCh:
for _, line := range strings.SplitAfter(lines, "\n") {
if len(line) > 0 {
require.Equal(t, sendLine, line)
received++
}
}
case err := <-errs:
require.NoError(t, err)
sendersDone++
case <-timeout:
t.Fatal("timed out waiting for lines")
}
}

assertNoMore(t, listenerCh)
}

func BenchmarkListenerLatency(b *testing.B) {
listener := startListener(b, testConfig())
defer listener.Stop()
Expand Down

0 comments on commit 693993a

Please sign in to comment.