Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

listener: Protect batch buffer from concurrent access #77

Merged
merged 1 commit into from
May 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type Listener struct {

wg sync.WaitGroup
stop chan struct{}
mu sync.Mutex // only used for HTTP listener
}

// Stop shuts down a running listener. It should be called exactly
Expand Down Expand Up @@ -205,15 +206,19 @@ func (l *Listener) listenUDP(sc *net.UDPConn) {
func (l *Listener) setupHTTP() *http.Server {
mux := http.NewServeMux()
mux.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) {
l.mu.Lock()
bytesRead, err := l.batch.readFrom(r.Body)
l.mu.Unlock()
if err != nil {
l.stats.Inc(statReadErrors)
}
if bytesRead > 0 {
if l.c.Debug {
log.Printf("HTTP listener read %d bytes", bytesRead)
}
l.mu.Lock()
l.processRead()
l.mu.Unlock()
}
})
return &http.Server{
Expand Down
56 changes: 56 additions & 0 deletions listener/listener_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,62 @@ func TestHTTPListenerBigPOST(t *testing.T) {
assertMonitor(t, monitorCh, 1, 1)
}

func TestHTTPListenerConcurrency(t *testing.T) {
conf := testConfig()
listener, err := StartHTTPListener(conf)
require.NoError(t, err)
spouttest.AssertReadyProbe(t, conf.ProbePort)
defer listener.Stop()

listenerCh, unsubListener := subListener(t)
defer unsubListener()

// Send the same line many times from multiple goroutines.
const senders = 10
const sendCount = 100
const totalLines = senders * sendCount
sendLine := fmt.Sprintf("cpu load=0.69 foo=bar %d\n", time.Now().UnixNano())

url := fmt.Sprintf("http://localhost:%d/write", listenPort)
errs := make(chan error, senders)
for sender := 0; sender < senders; sender++ {
go func() {
client := new(http.Client)
for i := 0; i < sendCount; i++ {
_, err := client.Post(url, "text/plain", bytes.NewBufferString(sendLine))
if err != nil {
errs <- err
}
}
errs <- nil
}()
}

// Wait for the senders to be done sending, and all the lines to
// be returned.
sendersDone := 0
received := 0
timeout := time.After(spouttest.LongWait)
for received < totalLines || sendersDone < senders {
select {
case lines := <-listenerCh:
for _, line := range strings.SplitAfter(lines, "\n") {
if len(line) > 0 {
require.Equal(t, sendLine, line)
received++
}
}
case err := <-errs:
require.NoError(t, err)
sendersDone++
case <-timeout:
t.Fatal("timed out waiting for lines")
}
}

assertNoMore(t, listenerCh)
}

func BenchmarkListenerLatency(b *testing.B) {
listener := startListener(b, testConfig())
defer listener.Stop()
Expand Down