diff --git a/listener/listener.go b/listener/listener.go index 600fcce..df19b84 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -73,7 +73,6 @@ func StartListener(c *config.Config) (_ *Listener, err error) { go listener.listenUDP(sc) log.Printf("UDP listener publishing to [%s] at %s", c.NATSSubject[0], c.NATSAddress) - listener.notifyState("ready") return listener, nil } @@ -93,11 +92,13 @@ func StartHTTPListener(c *config.Config) (*Listener, error) { go listener.listenHTTP(server) log.Printf("HTTP listener publishing to [%s] at %s", c.NATSSubject[0], c.NATSAddress) - listener.notifyState("ready") return listener, nil } +// Listener accepts measurements in InfluxDB Line Protocol format via +// UDP or HTTP, batches them and then publishes them to a NATS +// subject. type Listener struct { c *config.Config nc *nats.Conn @@ -118,6 +119,8 @@ func (l *Listener) Ready() <-chan struct{} { return l.ready } +// Stop shuts down a running listener. It should be called exactly +// once for every Listener instance. func (l *Listener) Stop() { close(l.stop) l.wg.Wait() @@ -147,7 +150,6 @@ func newListener(c *config.Config) (*Listener, error) { nc.Opts.MaxReconnect = -1 l.nc = nc - l.notifyState("boot") return l, nil } @@ -302,19 +304,6 @@ func (l *Listener) startStatistician() { } } -var notifyLine = lineformatter.New("spout_mon", nil, "type", "state", "pid") - -func (l *Listener) notifyState(state string) { - line := notifyLine.Format(nil, "listener", state, os.Getpid()) - if err := l.nc.Publish(l.c.NATSSubjectMonitor, line); err != nil { - l.handleNatsError(err) - return - } - if err := l.nc.Flush(); err != nil { - l.handleNatsError(err) - } -} - type timeouter interface { Timeout() bool }