Skip to content

Commit

Permalink
http_listener: pre-allocate buffer for reading request body
Browse files Browse the repository at this point in the history
Fixes: #1823 and #1856
  • Loading branch information
ShalinPatel-Apigeek committed Oct 12, 2016
1 parent a65447d commit 4c670f7
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 25 deletions.
64 changes: 43 additions & 21 deletions plugins/inputs/http_listener/http_listener.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package http_listener

import (
"bufio"
"bytes"
"compress/gzip"
"fmt"
"log"
"net"
"net/http"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -117,31 +118,52 @@ func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
switch req.URL.Path {
case "/write":
var http400msg bytes.Buffer
var partial string
scanner := bufio.NewScanner(req.Body)
scanner.Buffer([]byte(""), 128*1024)
for scanner.Scan() {
metrics, err := t.parser.Parse(scanner.Bytes())
if err == nil {
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
partial = "partial write: "
} else {
defer func() {
if http400msg.Len() > 0 {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(fmt.Sprintf(`{"error":"%s"}`, http400msg.String())))
}
}()

body := req.Body
if req.Header.Get("Content-Encoding") == "gzip" {
b, err := gzip.NewReader(req.Body)
if err != nil {
http400msg.WriteString(err.Error() + " ")
return
}
defer b.Close()
body = b
}

var bs []byte
if cl := req.Header.Get("Content-length"); cl != "" {
if l, err := strconv.Atoi(cl); err == nil {
bs = make([]byte, 0, l)
}
}

if err := scanner.Err(); err != nil {
http.Error(res, "Internal server error: "+err.Error(), http.StatusInternalServerError)
} else if http400msg.Len() > 0 {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(fmt.Sprintf(`{"error":"%s%s"}`, partial, http400msg.String())))
} else {
res.WriteHeader(http.StatusNoContent)
buf := bytes.NewBuffer(bs)
_, err := buf.ReadFrom(body)
if err != nil {
log.Printf("E! HttpListener unable to read request body. data: [%s], error: %s\n", string(buf.Bytes()), err.Error())
http400msg.WriteString("HttpHandler unable to read from request body: " + err.Error())
return
}

metrics, err := t.parser.Parse(buf.Bytes())
if err != nil {
log.Printf("E! HttpListener unable to parse metrics. data: [%s], error: %s \n", string(buf.Bytes()), err.Error())
http400msg.WriteString("Error while parsing metrics: " + err.Error())
return
}

for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
res.WriteHeader(http.StatusNoContent)
case "/query":
// Deliver a dummy response to the query endpoint, as some InfluxDB
// clients test endpoint availability with a query
Expand Down
53 changes: 49 additions & 4 deletions plugins/inputs/http_listener/http_listener_test.go

Large diffs are not rendered by default.

0 comments on commit 4c670f7

Please sign in to comment.