Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simple threading to UDP packet handling, dump message when UDP buffers likely overflowed. #196

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 55 additions & 3 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sort"
"strconv"
"strings"
"sync/atomic"
"time"
"unicode/utf8"

Expand All @@ -34,6 +35,7 @@ import (

"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/mapper"
"github.com/prometheus/statsd_exporter/pkg/telemetry"
)

const (
Expand Down Expand Up @@ -639,17 +641,67 @@ samples:
}

type StatsDUDPListener struct {
conn *net.UDPConn
conn *net.UDPConn
bufferWatcher *telemetry.BufferWatcher
bufferWarning uint32
}

func (l *StatsDUDPListener) Listen(e chan<- Events) {
func (l *StatsDUDPListener) Listen(threadCount int, packetHandlers int, e chan<- Events) {
bufferWatcher, err := telemetry.NewBufferWatcher(l.conn)
l.bufferWatcher = bufferWatcher
shouldWatchBuffer := true

if err != nil {
shouldWatchBuffer = false
log.Debugf("Unable to watch UDP buffer size due to: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this always happen on non-Linux platforms?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this always log? Yes, it would. I figured a debug log would be a good place to quietly drop that to minimize confusion over why they can't get the buffer tracking to work, but if you think it's unnecessary, I would be happy to more quietly short circuit that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's fine as it is, just trying to understand! Maybe add a comment to say how we expect the initialization to behave on non-Linux systems?

}

l.bufferWarning = 0

concurrentHandlersPerThread := packetHandlers / threadCount

for i := 0; i < threadCount; i++ {
go l.Listener(e, concurrentHandlersPerThread, shouldWatchBuffer)
}
}

func (l *StatsDUDPListener) Listener(e chan<- Events, concurrentPacketHandlers int, shouldWatchBuffer bool) {
var sem = make(chan struct{}, concurrentPacketHandlers)
buf := make([]byte, 65535)
for {
n, _, err := l.conn.ReadFromUDP(buf)
if err != nil {
log.Fatal(err)
}
l.handlePacket(buf[0:n], e)
if shouldWatchBuffer && atomic.LoadUint32(&l.bufferWarning) != 1 {
go l.watchBufferSize()
}

data := append([]byte(nil), buf[0:n]...)
select {
case sem <- struct{}{}:
{
go func() {
l.handlePacket(data[0:n], e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe to call handlePacket concurrently?

why a semaphore and goroutines per handlePacket call, vs long-lived goroutines that consume off a (buffered) channel? IIUC, with the buffered channel it would be possible to try to write an event into this channel, but if the buffer is full, increment a drop counter and move on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is safe! At least, I ran this with a -race build with no complaints, and have ~22 deployments of this happily chugging along at a pretty high volume and no related crashes.

Re: Semaphore - I think that is a great suggestion, this was originally @AustinSchoen's work, lemme collab with him. The idea of reading UDP packets as fast as we can and dropping them if the processing can't keep up instead of blocking (but keeping track of that drop tally) is so elegant, very +++ to headed down that path. That should totally alleviate my need for procfs metrics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all credit for that idea goes to @rtreffer!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe I went with a semaphore because it was the first example of a tunable queue of workers that I could find, there are multiple ways of handling this model. We were still early in the experimentation phase as to what amount and kind of concurrency we needed to handle the firehose of UDP packets coming at our exporters. Regardless I don't think we'd want to just drop data on the floor if we don't have the capacity to handle it in every situation. Those packets should back up into the protocol buffer while waiting to be read by the configured number of handlers.

In situations where a system is overloaded only for a few (mili)seconds it would be better to block read off the protocol buffer and wait to handle them than to drop and log that we lost packets IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we'd want to just drop data on the floor if we don't have the capacity to handle it in every situation

I agree, and I don't propose that we do that. We can buffer again after the initial receive.

However, at some point we have to drop, and we want to know when that happens. I would rather guarantee that that happens in a place where we can easily observe it (i.e. in user-lang Go code) than introduce complex platform-dependent monitoring of the socket buffers themselves.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With "buffer again" I mean stuff them into a buffered channel, we don't have to get fancy about it.

<-sem
}()
}

default:
l.handlePacket(data[0:n], e)
}
}
}

func (l *StatsDUDPListener) watchBufferSize() {
readBufferSize, err := l.bufferWatcher.GetSocketQueue()
if err != nil {
log.Debugf("Failed to watch UDP buffer size due to: %v", err)
} else {
if readBufferSize+65535 > l.bufferWatcher.ReadBuffer {
atomic.StoreUint32(&l.bufferWarning, 1)
log.Info("UDP buffer size is high enough that packet drops may have occurred. You may need to allocate more resources for the exporter, or increase the statsd.read-buffer parameter and the kernel parameter net.core.rmem_max")
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/howeyc/fsnotify v0.0.0-20151003194602-f0c08ee9c607
github.com/kr/pretty v0.1.0 // indirect
github.com/mattn/go-isatty v0.0.4 // indirect
github.com/mdlayher/netlink v0.0.0-20190313131330-258ea9dff42c
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with this package after it's use in https://github.com/prometheus/node_exporter

github.com/onsi/ginkgo v1.7.0 // indirect
github.com/onsi/gomega v1.4.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -19,7 +20,7 @@ require (
github.com/sergi/go-diff v1.0.0 // indirect
github.com/sirupsen/logrus v1.0.3 // indirect
github.com/stretchr/testify v1.2.2 // indirect
golang.org/x/crypto v0.0.0-20170825220121-81e90905daef // indirect
golang.org/x/sys v0.0.0-20190312061237-fead79001313
gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.5
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
Expand Down
13 changes: 11 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/howeyc/fsnotify v0.0.0-20151003194602-f0c08ee9c607 h1:+7wvV++11s0Okyl1dekihkIiCIYDz+Qk2LvxAShINU4=
github.com/howeyc/fsnotify v0.0.0-20151003194602-f0c08ee9c607/go.mod h1:41HzSPxBGeFRQKEEwgh49TRw/nKBsYZ2cF1OzPjSJsA=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
Expand All @@ -29,6 +31,8 @@ github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mdlayher/netlink v0.0.0-20190313131330-258ea9dff42c h1:qYXI+3AN4zBWsTF5drEu1akWPu2juaXPs58tZ4/GaCg=
github.com/mdlayher/netlink v0.0.0-20190313131330-258ea9dff42c/go.mod h1:eQB3mZE4aiYnlUsyGGCOpPETfdQq4Jhsgf1fk3cwQaA=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand All @@ -50,17 +54,22 @@ github.com/sirupsen/logrus v1.0.3 h1:B5C/igNWoiULof20pKfY4VntcIPqKuwEmoLZrabbUrc
github.com/sirupsen/logrus v1.0.3/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20170825220121-81e90905daef h1:R8ubLIilYRXIXpgjOg2l/ECVs3HzVKIjJEhxSsQ91u4=
golang.org/x/crypto v0.0.0-20170825220121-81e90905daef/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc h1:a3CU5tJYVj92DY2LaA1kUkrsqD5/3mLDhx2NcNqyW+0=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313 h1:pczuHS43Cp2ktBEEmLwScxgjWsBSzdaQiKzUyf3DTTc=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo=
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ func main() {
mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String()
readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int()
dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String()
listenerThreads = kingpin.Flag("udp-listener.threads", "The number of listener threads to receive UDP traffic.").Default("4").Int()
packetHandlers = kingpin.Flag("udp-listener.handlers", "The number of concurrent packet handlers").Default("1000").Int()
)

log.AddFlags(kingpin.CommandLine)
Expand Down Expand Up @@ -180,7 +182,7 @@ func main() {
}

ul := &StatsDUDPListener{conn: uconn}
go ul.Listen(events)
go ul.Listen(*listenerThreads, *packetHandlers, events)
}

if *statsdListenTCP != "" {
Expand Down
23 changes: 23 additions & 0 deletions pkg/telemetry/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package telemetry

// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import (
"net"
)

type BufferWatcher struct {
uAddr *net.UDPAddr
ReadBuffer int
}
160 changes: 160 additions & 0 deletions pkg/telemetry/buffer_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// +build linux

// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetry

import (
"bytes"
"encoding/binary"
"errors"
"net"
"strconv"
"syscall"
"unsafe"

"github.com/mdlayher/netlink"
"golang.org/x/sys/unix"
)

const SOCK_DIAG_BY_FAMILY = 20

type inetDiagSockId struct {
SourcePort uint16
DestPort uint16
SourceAddress [4]uint32
DestAddress [4]uint32
If uint32
Cookie [2]uint32
}

type inetDiagReqV2 struct {
Family uint8
Protocol uint8
Ext uint8
_ uint8
States uint32
Id inetDiagSockId
}

type InetDiagMsgData struct {
Family uint8
State uint8
Timer uint8
Retrans uint8
Id inetDiagSockId
Expires uint32
Rqueue uint32
Wqueue uint32
Uid uint32
Inode uint32
}

// MarshalBinary marshals a Message into a byte slice.
func (m inetDiagReqV2) MarshalBinary() ([]byte, error) {
var buf bytes.Buffer
if err := binary.Write(&buf, binary.LittleEndian, &m); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func convert_addr_to_int32(ip []byte) (ret [4]uint32) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use camelCase instead of snake_case

buf := bytes.NewBuffer(ip)
binary.Read(buf, binary.BigEndian, &ret)
return
}

func convert_port_to_u16(port int) uint16 {
uport := uint16(port)
portdat := make([]byte, 6)
binary.LittleEndian.PutUint16(portdat, uport)
uport = binary.BigEndian.Uint16(portdat)
return uport
}

func (b *BufferWatcher) GetSocketQueue() (int, error) {
c, err := netlink.Dial(unix.NETLINK_SOCK_DIAG, nil)
if err != nil {
return 0, err
}
defer c.Close()

r := inetDiagReqV2{
Family: unix.AF_INET6,
Protocol: unix.IPPROTO_UDP,
Ext: 255,
Id: inetDiagSockId{
SourcePort: convert_port_to_u16(b.uAddr.Port),
SourceAddress: convert_addr_to_int32(b.uAddr.IP),
},
States: 0xffffffff,
}

data, err := r.MarshalBinary()
if err != nil {
return 0, err
}

req := netlink.Message{
Header: netlink.Header{
Flags: netlink.Root | netlink.Match |
netlink.Request,
Type: SOCK_DIAG_BY_FAMILY,
},
Data: data,
}

// Perform a request, receive replies, and validate the replies
msgs, err := c.Execute(req)
if err != nil {
panic(err)
}

msg_count := len(msgs)

if msg_count == 1 {
m := msgs[0]
var data *InetDiagMsgData = *(**InetDiagMsgData)(unsafe.Pointer(&m.Data))

return int(data.Rqueue), nil
} else {
return 0, errors.New("Netlink returned an unexpected number of sockets: " + strconv.Itoa(msg_count))
}
}

func getReadBuffer(uConn *net.UDPConn) (int, error) {
file, err := uConn.File()
if err != nil {
return 0, err
}

readBuffer, err := unix.GetsockoptInt(int(file.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF)
if err != nil {
return 0, err
}
return readBuffer, nil
}

func NewBufferWatcher(uConn *net.UDPConn) (*BufferWatcher, error) {
readBuffer, err := getReadBuffer(uConn)
if err != nil {
return &BufferWatcher{}, err
}

return &BufferWatcher{
ReadBuffer: readBuffer,
uAddr: uConn.LocalAddr().(*net.UDPAddr),
}, nil
}
29 changes: 29 additions & 0 deletions pkg/telemetry/buffer_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// +build !linux

// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetry

import (
"errors"
"net"
)

func NewBufferWatcher(uConn *net.UDPConn) (*BufferWatcher, error) {
return &BufferWatcher{}, errors.New("UDP Buffer watching unsupported on this OS")
}

func (b *BufferWatcher) GetSocketQueue() (int, error) {
return 0, errors.New("UDP Buffer watching unsupported on this OS")
}
2 changes: 2 additions & 0 deletions vendor/github.com/mdlayher/netlink/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading