Skip to content

Commit

Permalink
Fix reload panic in socket_listener input plugin (#6218)
Browse files Browse the repository at this point in the history
(cherry picked from commit 60c8f38)
  • Loading branch information
sgtsquiggs authored and danielnelson committed Aug 6, 2019
1 parent 0ec40de commit 4d0ddec
Showing 1 changed file with 24 additions and 3 deletions.
27 changes: 24 additions & 3 deletions plugins/inputs/socket_listener/socket_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type streamSocketListener struct {
func (ssl *streamSocketListener) listen() {
ssl.connections = map[string]net.Conn{}

wg := sync.WaitGroup{}

for {
c, err := ssl.Accept()
if err != nil {
Expand Down Expand Up @@ -67,14 +69,20 @@ func (ssl *streamSocketListener) listen() {
ssl.AddError(fmt.Errorf("unable to configure keep alive (%s): %s", ssl.ServiceAddress, err))
}

go ssl.read(c)
wg.Add(1)
go func() {
defer wg.Done()
ssl.read(c)
}()
}

ssl.connectionsMtx.Lock()
for _, c := range ssl.connections {
c.Close()
}
ssl.connectionsMtx.Unlock()

wg.Wait()
}

func (ssl *streamSocketListener) setKeepAlive(c net.Conn) error {
Expand Down Expand Up @@ -169,6 +177,8 @@ type SocketListener struct {
SocketMode string `toml:"socket_mode"`
tlsint.ServerConfig

wg sync.WaitGroup

parsers.Parser
telegraf.Accumulator
io.Closer
Expand Down Expand Up @@ -302,7 +312,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
}

sl.Closer = ssl
go ssl.listen()
sl.wg = sync.WaitGroup{}
sl.wg.Add(1)
go func() {
defer sl.wg.Done()
ssl.listen()
}()
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
pc, err := udpListen(protocol, addr)
if err != nil {
Expand Down Expand Up @@ -336,7 +351,12 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
}

sl.Closer = psl
go psl.listen()
sl.wg = sync.WaitGroup{}
sl.wg.Add(1)
go func() {
defer sl.wg.Done()
psl.listen()
}()
default:
return fmt.Errorf("unknown protocol '%s' in '%s'", protocol, sl.ServiceAddress)
}
Expand Down Expand Up @@ -378,6 +398,7 @@ func (sl *SocketListener) Stop() {
sl.Close()
sl.Closer = nil
}
sl.wg.Wait()
}

func newSocketListener() *SocketListener {
Expand Down

0 comments on commit 4d0ddec

Please sign in to comment.