diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 05e93e684254..57bdd3d0aee3 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -67,6 +67,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta1...master[Check the HEAD di - Update init scripts to use the `test config` subcommand instead of the deprecated `-configtest` flag. {issue}4600[4600] - Get by default the credentials for connecting to Kibana from the Elasticsearch output configuration. {pull}4867[4867] +- Move TCP UDP start up into `server.Start()` {pull}4903[4903] *Auditbeat* diff --git a/metricbeat/helper/server/http/http.go b/metricbeat/helper/server/http/http.go index 9346d4e473ba..e6aee784e9c0 100644 --- a/metricbeat/helper/server/http/http.go +++ b/metricbeat/helper/server/http/http.go @@ -57,7 +57,7 @@ func NewHttpServer(mb mb.BaseMetricSet) (server.Server, error) { return h, nil } -func (h *HttpServer) Start() { +func (h *HttpServer) Start() error { go func() { logp.Info("Starting http server on %s", h.server.Addr) @@ -67,6 +67,7 @@ func (h *HttpServer) Start() { } }() + return nil } func (h *HttpServer) Stop() { diff --git a/metricbeat/helper/server/server.go b/metricbeat/helper/server/server.go index cac937975892..c10f560c5fcf 100644 --- a/metricbeat/helper/server/server.go +++ b/metricbeat/helper/server/server.go @@ -11,7 +11,7 @@ const ( // Server is an interface that can be used to implement servers which can accept data. type Server interface { // Start is used to start the server at a well defined port. - Start() + Start() error // Stop the server. Stop() // Get a channel of events. diff --git a/metricbeat/helper/server/tcp/tcp.go b/metricbeat/helper/server/tcp/tcp.go index a2ea7819779f..814316762827 100644 --- a/metricbeat/helper/server/tcp/tcp.go +++ b/metricbeat/helper/server/tcp/tcp.go @@ -4,6 +4,8 @@ import ( "fmt" "net" + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/helper/server" @@ -11,6 +13,7 @@ import ( ) type TcpServer struct { + tcpAddr *net.TCPAddr listener *net.TCPListener receiveBufferSize int done chan struct{} @@ -42,25 +45,27 @@ func NewTcpServer(base mb.BaseMetricSet) (server.Server, error) { return nil, err } - listener, err := net.ListenTCP("tcp", addr) - if err != nil { - return nil, err - } - - logp.Info("Started listening for TCP on: %s:%d", config.Host, config.Port) return &TcpServer{ - listener: listener, + tcpAddr: addr, receiveBufferSize: config.ReceiveBufferSize, done: make(chan struct{}), eventQueue: make(chan server.Event), }, nil } -func (g *TcpServer) Start() { - go g.WatchMetrics() +func (g *TcpServer) Start() error { + listener, err := net.ListenTCP("tcp", g.tcpAddr) + if err != nil { + return errors.Wrap(err, "failed to start TCP server") + } + g.listener = listener + logp.Info("Started listening for TCP on: %s", g.tcpAddr.String()) + + go g.watchMetrics() + return nil } -func (g *TcpServer) WatchMetrics() { +func (g *TcpServer) watchMetrics() { buffer := make([]byte, g.receiveBufferSize) for { select { diff --git a/metricbeat/helper/server/tcp/tcp_test.go b/metricbeat/helper/server/tcp/tcp_test.go index 6fd1a55c5a73..a57848e22c9e 100644 --- a/metricbeat/helper/server/tcp/tcp_test.go +++ b/metricbeat/helper/server/tcp/tcp_test.go @@ -20,14 +20,9 @@ func GetTestTcpServer(host string, port int) (server.Server, error) { return nil, err } - listener, err := net.ListenTCP("tcp", addr) - if err != nil { - return nil, err - } - logp.Info("Started listening for TCP on: %s:%d", host, port) return &TcpServer{ - listener: listener, + tcpAddr: addr, receiveBufferSize: 1024, done: make(chan struct{}), eventQueue: make(chan server.Event), @@ -43,7 +38,12 @@ func TestTcpServer(t *testing.T) { t.FailNow() } - svc.Start() + err = svc.Start() + if err != nil { + t.Error(err) + t.FailNow() + } + defer svc.Stop() writeToServer(t, "test1", host, port) msg := <-svc.GetEvents() diff --git a/metricbeat/helper/server/udp/udp.go b/metricbeat/helper/server/udp/udp.go index fc476b702071..124e4b0a5029 100644 --- a/metricbeat/helper/server/udp/udp.go +++ b/metricbeat/helper/server/udp/udp.go @@ -4,6 +4,8 @@ import ( "fmt" "net" + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/helper/server" @@ -11,6 +13,7 @@ import ( ) type UdpServer struct { + udpaddr *net.UDPAddr listener *net.UDPConn receiveBufferSize int done chan struct{} @@ -43,25 +46,28 @@ func NewUdpServer(base mb.BaseMetricSet) (server.Server, error) { return nil, err } - listener, err := net.ListenUDP("udp", addr) - if err != nil { - return nil, err - } - - logp.Info("Started listening for UDP on: %s:%d", config.Host, config.Port) return &UdpServer{ - listener: listener, + udpaddr: addr, receiveBufferSize: config.ReceiveBufferSize, done: make(chan struct{}), eventQueue: make(chan server.Event), }, nil } -func (g *UdpServer) Start() { - go g.WatchMetrics() +func (g *UdpServer) Start() error { + listener, err := net.ListenUDP("udp", g.udpaddr) + if err != nil { + return errors.Wrap(err, "failed to start UDP server") + } + + logp.Info("Started listening for UDP on: %s", g.udpaddr.String()) + g.listener = listener + + go g.watchMetrics() + return nil } -func (g *UdpServer) WatchMetrics() { +func (g *UdpServer) watchMetrics() { buffer := make([]byte, g.receiveBufferSize) for { select { diff --git a/metricbeat/helper/server/udp/udp_test.go b/metricbeat/helper/server/udp/udp_test.go index 737f88dd4e57..bb2c0a10c6e8 100644 --- a/metricbeat/helper/server/udp/udp_test.go +++ b/metricbeat/helper/server/udp/udp_test.go @@ -20,14 +20,9 @@ func GetTestUdpServer(host string, port int) (server.Server, error) { return nil, err } - listener, err := net.ListenUDP("udp", addr) - if err != nil { - return nil, err - } - logp.Info("Started listening for UDP on: %s:%d", host, port) return &UdpServer{ - listener: listener, + udpaddr: addr, receiveBufferSize: 1024, done: make(chan struct{}), eventQueue: make(chan server.Event), @@ -44,6 +39,11 @@ func TestUdpServer(t *testing.T) { } svc.Start() + if err != nil { + t.Error(err) + t.FailNow() + } + defer svc.Stop() writeToServer(t, "test1", host, port) msg := <-svc.GetEvents() diff --git a/metricbeat/module/graphite/server/server.go b/metricbeat/module/graphite/server/server.go index fac1154b5075..e06fb3140bb4 100644 --- a/metricbeat/module/graphite/server/server.go +++ b/metricbeat/module/graphite/server/server.go @@ -1,7 +1,10 @@ package server import ( + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/logp" serverhelper "github.com/elastic/beats/metricbeat/helper/server" "github.com/elastic/beats/metricbeat/helper/server/tcp" "github.com/elastic/beats/metricbeat/helper/server/udp" @@ -61,7 +64,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Run method provides the Graphite server with a reporter with which events can be reported. func (m *MetricSet) Run(reporter mb.PushReporter) { // Start event watcher - m.server.Start() + if err := m.server.Start(); err != nil { + err = errors.Wrap(err, "failed to start graphite server") + logp.Err("%v", err) + reporter.Error(err) + return + } for { select {