Skip to content

Commit

Permalink
Add threading at the UDP listener and packet handling level
Browse files Browse the repository at this point in the history
Co-authored-by: AustinSchoen <austin.schoen@gmail.com>
Signed-off-by: SpencerMalone <malone.spencer@gmail.com>
  • Loading branch information
SpencerMalone and AustinSchoen committed Apr 2, 2019
1 parent e5aa936 commit 5b125b5
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 11 deletions.
45 changes: 35 additions & 10 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,38 +642,63 @@ samples:

type StatsDUDPListener struct {
conn *net.UDPConn
bufferWatcher *telemetry.BufferWatcher
bufferWarning uint32
}

func (l *StatsDUDPListener) Listen(e chan<- Events) {
func (l *StatsDUDPListener) Listen(threadCount int, packetHandlers int, e chan<- Events) {
bufferWatcher, err := telemetry.NewBufferWatcher(l.conn)
l.bufferWarning = 0
watchBuffer := true
l.bufferWatcher = bufferWatcher
shouldWatchBuffer := true

if err != nil {
watchBuffer = false
shouldWatchBuffer = false
log.Debugf("Unable to watch UDP buffer size due to: %v", err)
}

l.bufferWarning = 0

concurrentHandlersPerThread := packetHandlers / threadCount

for i := 0; i < threadCount; i++ {
go l.Listener(e, concurrentHandlersPerThread, shouldWatchBuffer)
}
}

func (l *StatsDUDPListener) Listener(e chan<- Events, concurrentPacketHandlers int, shouldWatchBuffer bool) {
var sem = make(chan struct{}, concurrentPacketHandlers)
buf := make([]byte, 65535)
for {
n, _, err := l.conn.ReadFromUDP(buf)
if err != nil {
log.Fatal(err)
}
if watchBuffer && atomic.LoadUint32(&l.bufferWarning) != 1 {
go l.watchBufferSize(bufferWatcher)
if shouldWatchBuffer && atomic.LoadUint32(&l.bufferWarning) != 1 {
go l.watchBufferSize()
}

data := append([]byte(nil), buf[0:n]...)
select {
case sem <- struct{}{}:
{
go func() {
l.handlePacket(data[0:n], e)
<-sem
}()
}

default:
l.handlePacket(data[0:n], e)
}
l.handlePacket(buf[0:n], e)
}
}

func (l *StatsDUDPListener) watchBufferSize(bufferWatcher *telemetry.BufferWatcher) {
readBufferSize, err := bufferWatcher.GetSocketQueue()
func (l *StatsDUDPListener) watchBufferSize() {
readBufferSize, err := l.bufferWatcher.GetSocketQueue()
if err != nil {
log.Debugf("Failed to watch UDP buffer size due to: %v", err)
} else {
if readBufferSize+65535 > bufferWatcher.ReadBuffer {
if readBufferSize+65535 > l.bufferWatcher.ReadBuffer {
atomic.StoreUint32(&l.bufferWarning, 1)
log.Info("UDP buffer size is high enough that packet drops may have occurred. You may need to allocate more resources for the exporter, or increase the statsd.read-buffer parameter and the kernel parameter net.core.rmem_max")
}
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ func main() {
mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String()
readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int()
dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String()
listenerThreads = kingpin.Flag("udp-listener.threads", "The number of listener threads to receive UDP traffic.").Default("4").Int()
packetHandlers = kingpin.Flag("udp-listener.handlers", "The number of concurrent packet handlers").Default("1000").Int()
)

log.AddFlags(kingpin.CommandLine)
Expand Down Expand Up @@ -180,7 +182,7 @@ func main() {
}

ul := &StatsDUDPListener{conn: uconn}
go ul.Listen(events)
go ul.Listen(*listenerThreads, *packetHandlers, events)
}

if *statsdListenTCP != "" {
Expand Down
13 changes: 13 additions & 0 deletions pkg/telemetry/buffer.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
package telemetry

// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import (
"net"
)
Expand Down
13 changes: 13 additions & 0 deletions pkg/telemetry/buffer_linux.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
// +build linux

// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetry

import (
Expand Down
13 changes: 13 additions & 0 deletions pkg/telemetry/buffer_other.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
// +build !linux

// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetry

import (
Expand Down

0 comments on commit 5b125b5

Please sign in to comment.