Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
listener: Add support for HTTP "precision" argument
Browse files Browse the repository at this point in the history
This specifies the precision for incoming timestamps, mimicking the
same feature in InfluxDB. A fast-path is used when "precision" isn't
supplied.
  • Loading branch information
mjs committed May 22, 2018
1 parent 20a8916 commit 842bcce
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 25 deletions.
13 changes: 13 additions & 0 deletions listener/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ func (b *batch) readFrom(r io.Reader) (int, error) {
}
}

// appendBytes adds some bytes to the batch, growing the batch if
// required.
func (b *batch) appendBytes(more []byte) {
lenMore := len(more)
for b.remaining() < lenMore {
b.grow()
}

lenBatch := len(b.buf)
b.buf = b.buf[:lenBatch+lenMore]
copy(b.buf[lenBatch:], more)
}

// size returns the number of bytes currently stored in the batch.
func (b *batch) size() int {
return len(b.buf)
Expand Down
121 changes: 102 additions & 19 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@
package listener

import (
"bufio"
"bytes"
"fmt"
"log"
"net"
"net/http"
"os"
"strconv"
"sync"
"time"

"github.com/nats-io/go-nats"

"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/influx"
"github.com/jumptrading/influx-spout/probes"
"github.com/jumptrading/influx-spout/stats"
nats "github.com/nats-io/go-nats"
)

const (
Expand Down Expand Up @@ -205,29 +208,109 @@ func (l *Listener) listenUDP(sc *net.UDPConn) {

func (l *Listener) setupHTTP() *http.Server {
mux := http.NewServeMux()
mux.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) {
l.mu.Lock()
bytesRead, err := l.batch.readFrom(r.Body)
l.mu.Unlock()
if err != nil {
l.stats.Inc(statReadErrors)
}
if bytesRead > 0 {
if l.c.Debug {
log.Printf("HTTP listener read %d bytes", bytesRead)
}
l.mu.Lock()
l.processRead()
l.mu.Unlock()
}
w.WriteHeader(http.StatusNoContent)
})
mux.HandleFunc("/write", l.handleHTTPWrite)
return &http.Server{
Addr: fmt.Sprintf(":%d", l.c.Port),
Handler: mux,
}
}

func (l *Listener) handleHTTPWrite(w http.ResponseWriter, r *http.Request) {
bytesRead, err := l.readHTTPBody(r)
if bytesRead > 0 {
if l.c.Debug {
log.Printf("HTTP listener read %d bytes", bytesRead)
}
l.mu.Lock()
l.processRead()
l.mu.Unlock()
}
if err != nil {
l.stats.Inc(statReadErrors)
}
w.WriteHeader(http.StatusNoContent)
}

func (l *Listener) readHTTPBody(r *http.Request) (int, error) {
precision := r.URL.Query().Get("precision")

if precision == "" || precision == "ns" {
// Fast-path when timestamps are already in nanoseconds - no
// need for conversion.
return l.readHTTPBodyNanos(r)
}

// Non-nanosecond precison specified. Read lines individually and
// convert timestamps to nanoseconds.
return l.readHTTPBodyWithPrecision(r, precision)
}

func (l *Listener) readHTTPBodyNanos(r *http.Request) (int, error) {
l.mu.Lock()
defer l.mu.Unlock()
return l.batch.readFrom(r.Body)
}

func (l *Listener) readHTTPBodyWithPrecision(r *http.Request, precision string) (int, error) {
scanner := bufio.NewScanner(r.Body)

// scanLines is like bufio.ScanLines but the returned lines
// includes the trailing newlines. Leaving the newline on the line
// is useful for incoming lines that don't contain a timestamp and
// therefore should pass through unchanged.
scanner.Split(scanLines)

bytesRead := 0
for scanner.Scan() {
line := scanner.Bytes()
bytesRead += len(line)
if len(line) <= 1 {
continue
}

newLine := applyTimestampPrecision(line, precision)
l.mu.Lock()
l.batch.appendBytes(newLine)
l.mu.Unlock()
}
return bytesRead, scanner.Err()
}

func scanLines(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if i := bytes.IndexByte(data, '\n'); i >= 0 {
// We have a full newline-terminated line.
return i + 1, data[0 : i+1], nil

}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil

}
// Request more data.
return 0, nil, nil
}

func applyTimestampPrecision(line []byte, precision string) []byte {
ts, offset := influx.ExtractTimestamp(line)
if offset == -1 {
return line
}

newTs, err := influx.SafeCalcTime(ts, precision)
if err != nil {
return line
}

newLine := make([]byte, offset, offset+influx.MaxTsLen+1)
copy(newLine, line[:offset])
newLine = strconv.AppendInt(newLine, newTs.UnixNano(), 10)
return append(newLine, '\n')
}

func (l *Listener) listenHTTP(server *http.Server) {
defer l.wg.Done()

Expand Down
56 changes: 50 additions & 6 deletions listener/listener_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,7 @@ loop:

func TestHTTPListener(t *testing.T) {
conf := testConfig()
listener, err := StartHTTPListener(conf)
require.NoError(t, err)
spouttest.AssertReadyProbe(t, conf.ProbePort)
listener := startHTTPListener(t, conf)
defer listener.Stop()

listenerCh, unsubListener := subListener(t)
Expand Down Expand Up @@ -245,9 +243,7 @@ func TestHTTPListenerBigPOST(t *testing.T) {

func TestHTTPListenerConcurrency(t *testing.T) {
conf := testConfig()
listener, err := StartHTTPListener(conf)
require.NoError(t, err)
spouttest.AssertReadyProbe(t, conf.ProbePort)
listener := startHTTPListener(t, conf)
defer listener.Stop()

listenerCh, unsubListener := subListener(t)
Expand Down Expand Up @@ -299,6 +295,43 @@ func TestHTTPListenerConcurrency(t *testing.T) {
assertNoMore(t, listenerCh)
}

func TestHTTPListenerWithPrecision(t *testing.T) {
conf := testConfig()
listener := startHTTPListener(t, conf)
defer listener.Stop()

listenerCh, unsubListener := subListener(t)
defer unsubListener()

// Construct lines with timestamps. Seconds based timestamps will
// be sent into the listener which it should convert to nanosecond
// based timestamps on the way out.
in := bytes.NewBuffer(nil)
out := bytes.NewBuffer(nil)
for i, line := range poetry {
if i == 0 {
// No timestamp on the first line.
in.WriteString(line)
out.WriteString(line)
} else {
line = strings.TrimRight(line, "\n")
secs := int64(1500000000 + i)
nanos := secs * int64(time.Second)
in.WriteString(line + fmt.Sprintf(" %d\n", secs))
out.WriteString(line + fmt.Sprintf(" %d\n", nanos))
}
}

// Send the input lines.
url := fmt.Sprintf("http://localhost:%d/write?precision=s", listenPort)
_, err := http.Post(url, "text/plain", in)
require.NoError(t, err)

// Check for the expected output.
assertBatch(t, listenerCh, out.String())
assertNoMore(t, listenerCh)
}

func BenchmarkListenerLatency(b *testing.B) {
listener := startListener(b, testConfig())
defer listener.Stop()
Expand Down Expand Up @@ -328,6 +361,17 @@ func startListener(t require.TestingT, conf *config.Config) *Listener {
return listener
}

func startHTTPListener(t require.TestingT, conf *config.Config) *Listener {
listener, err := StartHTTPListener(conf)
require.NoError(t, err)
if !spouttest.CheckReadyProbe(conf.ProbePort) {
listener.Stop()
t.Errorf("HTTP listener not ready")
t.FailNow()
}
return listener
}

// dialListener creates a UDP connection to the listener's inbound port.
func dialListener(t require.TestingT) *net.UDPConn {
saddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("0.0.0.0:%d", listenPort))
Expand Down

0 comments on commit 842bcce

Please sign in to comment.